Parking Finder
The Parking Finder system is a smart parking solution designed to help drivers efficiently locate available
Loading...
Searching...
No Matches
find_distance.py File Reference

@breif Smart parking solution designed to help drivers efficiently locate available parking spaces at busy locations, such as university campus. More...

Go to the source code of this file.

Classes

class  find_distance.ROIEditor
 Interactive polygon editor for defining parking spot ROIs at runtime. More...
class  find_distance.CalibWindow
 Live calibration window with OpenCV trackbars for tuning CALIB_PARAMS. More...

Functions

 find_distance.set_jetson_clocks (bool enable)
 Lock Jetson hardware clocks at maximum frequency for consistent inference latency.
 find_distance.parse_args ()
 Parse command-line arguments.
np.ndarray find_distance.shm_ndarray (SharedMemory shm, tuple shape)
 Create a NumPy array view backed by a SharedMemory block.
 find_distance.get_detections ()
 Flask REST endpoint — returns current parking spot occupancy states.
 find_distance.check_parking_spots (str cam_name, list disp_boxes)
 Determine occupancy state for each parking spot ROI on a given camera.
 find_distance.annotate_frame (np.ndarray frame, list disp_boxes, list scores, list names, str cam_name)
 Draw YOLO detection boxes and confidence labels directly onto a frame (in-place).
np.ndarray find_distance.draw_spot_overlays (np.ndarray frame, str cam_name)
 Blend semi transparent ROI polygon overlays onto a camera frame.
list[dict] find_distance.generate_rois (str cam_id, tuple vanishing_point, int near_y, int far_y, int left_x, int right_x, int n_spots=5, int n_rows=1, float fisheye_distortion=0.0, float perspective_strength=1.0, str name=None, list spot_boundaries=None, **kwargs)
 Procedurally generate perspective correct parking spot ROI polygons.
list[dict] find_distance.auto_caliberate_rois (str cam_name, np.ndarray frame, dict current_params, list detections=None)
 Automatically recalibrate ROI polygons for a camera using edge detection.
 find_distance.rebuild_spot_masks (str cam_name, list[dict] new_rois, bool reset_states=False)
 Rebuild the in-memory ROI and mask tables for a camera after calibration.
 find_distance.capture_worker (int cam_id, int src, str raw_shm_name, raw_lock, raw_frame_id, frame_ready_event, stop_event)
 Camera capture worker — runs as a separate Process per camera.
 find_distance.inference_loop (bool need_annotations, frame_ready_event, stop_event)
 YOLO inference thread — batches frames from all cameras and runs detection.
 find_distance.display_loop (stop_event)
 Display loop — renders the live camera feed with overlays in a GUI window.
list[float] find_distance.blend_boundaries (list[float] detected, list[float] manual, float manual_weight=0.15)
 find_distance.record_loop (stop_event, use_anns=False)
 Recording loop — writes camera frames to timestamped MP4 files.
 find_distance.auto_calibrate_loop (stop_event)
 Background auto-calibration thread — periodically re-runs ROI calibration.
list[float] find_distance.merge_narrow_spots (list[float] boundaries, float min_width=80.0)
 Remove spot boundaries that would produce slots narrower than min_width pixels.
tuple[list[float], int] find_distance.detect_spot_boundaries (np.ndarray frame, int near_y, int far_y, int left_x, int right_x, int n_spots, list detections=None, str cam_name=None)
 Detect parking spot divider X positions from lane markings in a camera frame.
 find_distance._try_from_cars (detections, near_y, left_x, right_x, n_spots, fallback)
 Fallback boundary estimator that infers spot dividers from car center positions.
 find_distance.shutdown ()
 atexit handler — gracefully shuts down all workers and frees shared memory.

Variables

 find_distance.benchmark
 find_distance.enabled
str find_distance.JETSON_CLOCKS_CONF = '/tmp/jetson_clocks_backup.conf'
 Temporary file path used to store the original jetson clock state before locking.
int find_distance.CAP_W = 640
 capture frame width in pixels.
int find_distance.CAP_H = 480
 Capture frame height in pixels.
int find_distance.DISP_W = 640
 Display frame width in pixels.
int find_distance.DISP_H = 480
 Display frame height in pixels.
tuple find_distance.CAP_SHAPE = (CAP_H, CAP_W, 3)
 NumPy shape tuple for a raw capture frame (H, W, 3).
tuple find_distance.DISP_SHAPE = (DISP_H, DISP_W, 3)
 NumPy shape tuple for a display frame (H, W, 3).
list find_distance.SOURCES = ['./recordings/left_side_cam.mp4', './recordings/right_side_ccam.mp4']
 Video sources for each camera.
int find_distance.CAPTURE_FPS = 30
 Target capture framerate for live cameras and recording output.
int find_distance.INFERENCEFPS = 15
 Maximum inference framerate.
int find_distance.IMGSZ = 128
 Input image size (square) fed to the YOLO model in pixels.
float find_distance.CONF = 0.25
 YOLO detection confidence threshold.
str find_distance.MODEL_PATH = "./models/yolo11n.engine"
 Path to the YOLO model file.
int find_distance.MAX_BATCH = 3
 Maximum batch size passed to the YOLO model per inference call.
list find_distance.CLASSES = [2, 3, 5, 7]
 COCO class IDs to detect.
list find_distance.CAM_ORDER = ['Left', 'Right']
 Ordered list of camera name strings.
list find_distance.DISPLAY_ORDER = [0, 1]
 Order in which camera panels are stitched horizontally in the display window.
list find_distance.INF_IDX = [0, 1]
 Camera indices that are sent to the inference pipeline.
dict find_distance.SPOT_CAMS = {'Left', 'Right'}
 Set of camera names that have parking spot ROIs defined and should be checked for occupancy.
dict find_distance.MIN_BOX_H = {'Left': 30, 'Right': 30}
 Per camera minimum bounding box height in pixels.
float find_distance.INTERSECT_ALLOWANCE = 0.15
 Minimum fraction of a spot's mask area that a bounding box must overlap to mark the spot as occupied.
int find_distance.AUTO_CALIBRATE_INTERVAL = 0
 Seconds between automatic ROI recalibration passes.
dict find_distance.ROIS
 Per camera list of parking spot definitions.
dict find_distance.SPOT_MASK = {}
 Per camera list of boolean NumPy arrays (DISP_H x DISP_W) pre-rasterised from ROIS.
dict find_distance.CALIB_PARAMS
 Per camera calibration parameters used by generate_rois() and auto_caliberate_rois().
 find_distance.m = np.zeros((DISP_H, DISP_W), dtype=np.uint8)
tuple find_distance.OCCUPIED_COLOR = (0, 0, 220)
 BGR colour used to fill occupied spot overlays (red tint).
tuple find_distance.EMPTY_COLOR = (0, 220, 80)
 BGR colour used to fill empty spot overlays (green tint).
float find_distance.SPOT_ALPHA = 0.25
 Opacity of the spot overlay blend.
 find_distance.store_lock = threading.Lock()
 Threading lock protecting all shared state variables (spot_states, _last_boxes, etc.) from concurrent reads/writes by the inference and display threads.
dict find_distance.spot_states
 Per-camera list of occupancy strings for each spot.
list find_distance.RAW_SHM_OBJS = []
 SharedMemory objects holding raw (unannotated) camera frames.
list find_distance.ANN_SHM_OBJS = []
 SharedMemory objects holding annotated frames written by the inference thread.
list find_distance.RAW_BUFS = []
 NumPy arrays mapped onto RAW_SHM_OBJS for zero copy frame access.
list find_distance.ANN_BUFS = []
 NumPy arrays mapped onto ANN_SHM_OBJS for zero copy annotated frame access.
list find_distance.ANN_LOCKS = []
 Per camera multiprocessing Locks protecting ANN_BUFS entries.
list find_distance.RAW_LOCKS = []
 Per-camera multiprocessing Locks protecting RAW_BUFS entries.
list find_distance.RAW_FRAME_ID = []
 Per camera shared Value('i') counters incremented each time a new frame is written.
list find_distance.PROCESSES = []
 List of capture worker Process objects, kept for clean shutdown.
int find_distance._MAIN_PID = 0
 PID of the main process.
 find_distance.STOP_EVENT = Event()
 Multiprocessing Event shared with all worker processes and threads.
dict find_distance._last_boxes = {cam: [] for cam in CAM_ORDER}
 Per camera cache of the most recent raw bounding boxes (display coordinates).
dict find_distance._last_ann_boxes = {cam: [] for cam in CAM_ORDER}
 Per camera cache of (box, score, name) tuples from the last inference pass.
dict find_distance._empty_streak = {cam: [] for cam in CAM_ORDER}
 Per camera, per spot counter of consecutive inference frames with no detection.
dict find_distance._manual_bounds = {cam: [] for cam in CAM_ORDER}
 Per camera list of manually set spot boundary X positions from the CalibWindow.
dict find_distance._debug_frames = {}
 Per camera debug visualisation frames showing detected edges and boundary lines.
int find_distance.EMPTY_CONFIRM_FRAMES = 3
 Number of consecutive inference frames a spot must appear empty before its state is changed from 'Car' to 'Empty'.
 find_distance.app = Flask(__name__)
 find_distance.roi_editor = ROIEditor()
 find_distance.calib_window = CalibWindow()
 find_distance.args = parse_args()
 find_distance.frame_ready_event = Event()
 find_distance.need_annotation = args.test or args.record
float find_distance.usb_1 = 2.1
float find_distance.usb_2 = 2.2
 find_distance.line = f.readline()
 find_distance.port = float(re.search(r'\d+\.\d+', line).group())
 find_distance.idx = int(re.search(r'\d+', line).group())
 find_distance.probe = cv2.VideoCapture(src)
 find_distance.ok
 find_distance._
 find_distance.raw = SharedMemory(create=True, size=CAP_H * CAP_W * 3)
 find_distance.ann = SharedMemory(create=True, size=DISP_H * DISP_W * 3)
 find_distance.p
int find_distance.deadline = time.time() + 15
 find_distance.auto_calibrate_t
 find_distance.inf_t
 find_distance.target
 find_distance.daemon
 find_distance.rec_t = Thread(target=record_loop, args=(STOP_EVENT, args.annotate), daemon=True)

Detailed Description

@breif Smart parking solution designed to help drivers efficiently locate available parking spaces at busy locations, such as university campus.

Captures frames from two cameras (left and right), runs batched YOLO inference to detect vehicle, and determines parking spot occupancy by checking bounding box ovelap against pre-defined ROI polygons. Results are streamed via a Flask REST API, with optional visualization and recording functionality.

Usage

python find_distance.py # Headless mode: Flask API only
python find_distance.py -T, -t, --test # Display window for visual testing
python find_distance.py -r,--record # Record raw frames to disk
python find_distance.py -T,-r # Display and record simultaneously
pyhton find_distance.py -r -a # Record with detection annotations

CPU Core Affinity Layout

Machine: Jetson Orin Nano CPU core layout core 0 - Main / Flask API core 1 - camera 0 capture process core 2 - camera 1 capture process core 3 - Auto Calibration thread core 4 - Yolo inference thread core 5 - Display / Record thread

ROI Editor (requires -T / –test)

Key Action
I Toggle editor on/off
Left click Add point to current polygon
Right click Undo last point
Enter Finish polygon and print coordinates to terminal
Backspace Clear all points for current polygon
Tab Cycle active camera (Left / Right)
Q Quit
Warning
Always read the printed warnings
Note
TensorRt export command:
yolo export model=yolo11n.pt format=engine half=true device=0 imgsz=128 batch=3

Definition in file find_distance.py.

Function Documentation

◆ _try_from_cars()

find_distance._try_from_cars ( detections,
near_y,
left_x,
right_x,
n_spots,
fallback )
protected

Fallback boundary estimator that infers spot dividers from car center positions.

Used when lane marking detection finds too few divider lines. Takes the horizontal centers of detected cars that are close to near_y and places divider boundaries at the midpoints between adjacent cars. Falls back to a uniform layout (after merge_narrow_spots cleaning) if fewer than 2 car centers are available or if the resulting boundary count does not match n_spots+1.

Parameters
detectionsList of [x1,y1,x2,y2] detection boxes, or None.
near_yY pixel of the near edge used to filter relevant detections.
left_xLeft boundary X pixel.
right_xRight boundary X pixel.
n_spotsExpected number of parking spots.
fallbackUniform boundary list used when car-based estimation fails.
Returns
Tuple of (boundaries, n_spots).

Definition at line 1857 of file find_distance.py.

1857def _try_from_cars(detections, near_y, left_x, right_x, n_spots, fallback):
1858 if not detections:
1859 fallback = merge_narrow_spots(fallback)
1860 return fallback, len(fallback) - 1
1861
1862 car_centers = []
1863 for x1, y1, x2, y2 in detections:
1864 if abs(y2 - near_y) < 80:
1865 car_centers.append((x1 + x2) / 2)
1866
1867 car_centers = sorted(c for c in car_centers if left_x < c < right_x)
1868
1869 if len(car_centers) < 2:
1870 fallback = merge_narrow_spots(fallback)
1871 return fallback, len(fallback) - 1
1872
1873 boundaries = [left_x]
1874 for i in range(len(car_centers) - 1):
1875 boundaries.append((car_centers[i] + car_centers[i + 1]) / 2)
1876 boundaries.append(right_x)
1877
1878 if len(boundaries) == n_spots + 1:
1879 boundaries = merge_narrow_spots(boundaries)
1880 return boundaries, len(boundaries) - 1
1881 fallback = merge_narrow_spots(fallback)
1882 return [(x, x) for x in fallback], len(fallback) - 1
1883

◆ annotate_frame()

find_distance.annotate_frame ( np.ndarray frame,
list disp_boxes,
list scores,
list names,
str cam_name )

Draw YOLO detection boxes and confidence labels directly onto a frame (in-place).

Iterates over the paired box/score/name lists and draws a filled rectangle label above each bounding box. Boxes shorter than MIN_BOX_H are skipped to ignore noise from distant or partial detections. Modifies the frame array in-place to avoid an extra memory allocation.

Parameters
frameBGR uint8 NumPy array to annotate. Modified in-place.
disp_boxesList of [x1, y1, x2, y2] bounding boxes in display coordinates.
scoresConfidence score (float) for each box.
namesCOCO class name string for each box (e.g. "car", "truck").
cam_nameCamera identifier used to look up the MIN_BOX_H threshold.
    Draw annotation directly on frames, no copy less memory overhead

Definition at line 430 of file find_distance.py.

430def annotate_frame(frame: np.ndarray, disp_boxes: list, scores: list, names: list, cam_name: str):
431 """
432 Draw annotation directly on frames, no copy less memory overhead
433 """
434 min_h = MIN_BOX_H.get(cam_name, 0)
435 for box, score, name in zip(disp_boxes, scores, names):
436 x1, y1, x2, y2 = box
437 if (y2 - y1) < min_h:
438 continue
439 x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
440
441 color = (0, 150, 150)
442 cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
443
444 label = f'{name} {score:.2f}'
445 (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_COMPLEX, 0.5, 1)
446 cv2.rectangle(frame, (x1, y1 - th - 8), (x1 + tw + 6, y1), color, -1)
447 cv2.putText(frame, label, (x1 + 3, y1 - 4), cv2.FONT_HERSHEY_COMPLEX, 0.5, (15, 15, 15), 1, cv2.LINE_AA)
448

◆ auto_caliberate_rois()

list[dict] find_distance.auto_caliberate_rois ( str cam_name,
np.ndarray frame,
dict current_params,
list detections = None )

Automatically recalibrate ROI polygons for a camera using edge detection.

Analyses a snapshot frame using the following pipeline:

  1. Converts to greyscale and applies Canny edge detection.
  2. Uses Probabilistic Hough Transform to find line segments.
  3. Filters out horizontal lines near known car bounding box edges to avoid using car rooftops/hoods as calibration references.
  4. Clusters remaining horizontal lines to estimate the far_y boundary.
  5. Clamps far_y drift to ±50px of the original value to prevent wild jumps.
  6. Detects near-vertical lines and computes their pairwise intersections to estimate the vanishing point.
  7. Calls detect_spot_boundaries() to find individual spot divider positions, optionally blending in any manually set boundaries from _manual_bounds.
  8. Calls generate_rois() with the updated parameters to produce new polygons.

Updates CALIB_PARAMS in-place and syncs the CalibWindow trackbars if open. Returns None and prints a warning if insufficient lines are detected.

Parameters
cam_nameCamera identifier string, e.g. "Left".
frameBGR display-resolution frame to analyse.
current_paramsCurrent CALIB_PARAMS dict for this camera.
detectionsOptional list of [x1,y1,x2,y2] boxes to exclude car edges from the line analysis.
Returns
List of new ROI dicts (same format as generate_rois()), or None on failure.

Definition at line 649 of file find_distance.py.

649def auto_caliberate_rois(cam_name: str, frame: np.ndarray, current_params: dict, detections: list = None) -> list[dict]:
650 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
651 blur = cv2.GaussianBlur(gray, (5, 5), 0)
652 edges = cv2.Canny(blur, 30, 100)
653
654 roi_mask = np.zeros_like(edges)
655 roi_mask[frame.shape[0] // 3:, :] = 255
656 edges = cv2.bitwise_and(edges, roi_mask)
657 lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=60, minLineLength=80, maxLineGap=20)
658 left_x = current_params['left_x']
659 right_x = current_params['right_x']
660 near_y = current_params['near_y']
661 far_y_ref = current_params.get('far_y', 0)
662 if lines is None or len(lines) < 4:
663 print(f"\033[1;91mWarning: [Auto Caliberation] Not enough lines detected, keeping current ROIS\033[0m")
664 return None
665
666 car_y_exclusions = []
667 if detections:
668 for x1, y1, x2, y2 in detections:
669 car_cx = (x1 + x2) / 2
670 car_cy = (y1 + y2) / 2
671 if not (left_x <= car_cx <= right_x and far_y_ref <= car_cy <= near_y):
672 continue
673 pad = 15
674 car_y_exclusions.append((int(y1) - pad, int(y1) + pad))
675 car_y_exclusions.append((int(y2) - pad, int(y2) + pad))
676
677 def is_near_car_edge(y):
678 for low, hi in car_y_exclusions:
679 if low <= y <= hi:
680 return True
681 return False
682
683 h_lines = []
684 for l in lines:
685 x1, y1, x2, y2 = l[0]
686 angle = abs(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
687 if angle < 20 or angle > 160:
688 mid_y = (y1 + y2) / 2
689 if not is_near_car_edge(mid_y):
690 h_lines.append((x1, y1, x2, y2))
691
692 if len(h_lines) < 2:
693 print(f"\033[1;91mWarning: [Auto Caliberation] Not enough horizontal lines, keeping current ROIS\033[0m")
694 return None
695
696 ys = np.array([(y1 + y2) / 2 for x1, y1, x2, y2 in h_lines])
697 ys_sorted = np.sort(ys)
698
699 gaps = np.diff(ys_sorted)
700 big_gaps = np.where(gaps > 20)[0]
701
702 cluster_centers = []
703 prev = 0
704 for g in big_gaps:
705 cluster_centers.append(np.mean(ys_sorted[prev:g + 1]))
706 prev = g + 1
707 cluster_centers.append(np.mean(ys_sorted[prev:]))
708
709 if len(cluster_centers) < 2:
710 print(f"\033[1;91mWarning: [Auto Caliberation] Could not find row boundaries\033[0m")
711 return None
712
713 min_far_y = near_y - 250
714 valid_clusters = [c for c in cluster_centers if min_far_y < c < near_y]
715 if not valid_clusters:
716 print(f"\033[1;91mWarning: [Auto Caliberation] no valid far_y found\033[0m")
717 return None
718 far_y = int(min(valid_clusters, key=lambda c: near_y - c))
719
720 n_spots = current_params.get('n_spots', 5)
721 n_rows = current_params.get('n_rows', 1)
722 distortion = current_params.get('fisheye_distortion', 0.0)
723 strength = current_params.get('perspective_strength', 1.0)
724
725 MIN_ROI_HEIGHT = current_params.get('min_roi_height', 60)
726
727 if near_y - far_y < MIN_ROI_HEIGHT:
728 far_y = near_y - MIN_ROI_HEIGHT
729 print(f"\033[1;93mWarning: [Auto Caliberation] far_y too close to near_y, clamped to {far_y}\033[0m")
730
731 far_y = max(0, far_y)
732 original_far_y = CALIB_PARAMS[cam_name].get('far_y', far_y)
733 max_drift = 50
734 far_y = max(far_y, original_far_y - max_drift)
735 far_y = min(far_y, original_far_y + max_drift)
736 vanish_lines = []
737 for l in lines[:]:
738 x1, y1, x2, y2 = l[0]
739 angle = abs(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
740 if 10 < angle < 80 or 100 < angle < 170:
741 vanish_lines.append((x1, y1, x2, y2))
742
743 vanishing_point = current_params['vanishing_point']
744
745 if len(vanish_lines) >= 2:
746 intersections = []
747 for i in range(min(len(vanish_lines), 8)):
748 for j in range(i + 1, min(len(vanish_lines), 8)):
749 x1, y1, x2, y2 = vanish_lines[i]
750 x3, y3, x4, y4 = vanish_lines[j]
751 d = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
752 if abs(d) < 1e-6:
753 continue
754 t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / d
755 ix = x1 + t * (x2 - x1)
756 iy = y1 + t * (y2 - y1)
757 if 0 < ix < DISP_W and 0 < iy < far_y + 20:
758 intersections.append((ix, iy))
759
760 if intersections:
761 vanishing_point = (
762 int(np.median([p[0] for p in intersections])),
763 int(np.median([p[1] for p in intersections])))
764
765 CALIB_PARAMS[cam_name]['far_y'] = far_y
766 CALIB_PARAMS[cam_name]['vanishing_point'] = vanishing_point
767
768 if calib_window.open and calib_window.active_cam == cam_name:
769 cv2.setTrackbarPos('Far Y', CalibWindow.WIN, far_y)
770 cv2.setTrackbarPos('VP X', CalibWindow.WIN, vanishing_point[0])
771 cv2.setTrackbarPos('VP Y', CalibWindow.WIN, vanishing_point[1])
772
773 roi_detections = [
774 box for box in (detections or [])
775 if left_x <= (box[0] + box[2]) / 2 <= right_x
776 and far_y <= (box[1] + box[3]) / 2 <= near_y
777 ]
778 boundaries, detected_n_spots = detect_spot_boundaries(frame, near_y, far_y, left_x, right_x, n_spots,
779 detections=roi_detections, cam_name=cam_name)
780 manual = _manual_bounds.get(cam_name, [])
781 if manual:
782 boundaries = blend_boundaries(boundaries, manual, manual_weight=0.15)
783
784 return generate_rois(cam_id=cam_name, vanishing_point=vanishing_point, near_y=near_y, far_y=far_y, left_x=left_x,
785 right_x=right_x, n_spots=detected_n_spots, n_rows=n_rows, fisheye_distortion=distortion,
786 perspective_strength=strength, spot_boundaries=boundaries)
787

◆ auto_calibrate_loop()

find_distance.auto_calibrate_loop ( stop_event)

Background auto-calibration thread — periodically re-runs ROI calibration.

Runs as a daemon Thread pinned to CPU core 3. Sleeps for AUTO_CALIBRATE_INTERVAL seconds between passes (set to 0 to disable). On each pass it iterates over all SPOT_CAMS, skips cameras where the CalibWindow is open (to avoid fighting the user's manual adjustments), and calls auto_caliberate_rois() on the latest snapshot. If calibration succeeds, rebuild_spot_masks() is called to apply the new ROIs immediately.

Skips cameras with more than 3 simultaneous detections, as a heavily occupied lot provides poor lane-marking signal for edge detection.

Parameters
stop_eventMultiprocessing Event polled to exit the loop.

Definition at line 1653 of file find_distance.py.

1653def auto_calibrate_loop(stop_event):
1654 try:
1655 os.sched_setaffinity(0, {3})
1656 except Exception:
1657 pass
1658
1659 print('[Auto Calibrate] Background Calibration thread started')
1660 while not stop_event.is_set():
1661 time.sleep(AUTO_CALIBRATE_INTERVAL)
1662
1663 if stop_event.is_set():
1664 break
1665
1666 for idx, cam_name in enumerate(CAM_ORDER):
1667 if cam_name not in SPOT_CAMS:
1668 continue
1669
1670 if calib_window.suppress_auto and calib_window.active_cam == cam_name:
1671 continue
1672 try:
1673 with RAW_LOCKS[idx]:
1674 snap = RAW_BUFS[idx].copy()
1675
1676 snap_disp = cv2.resize(snap, (DISP_W, DISP_H), interpolation=cv2.INTER_NEAREST)
1677
1678 with store_lock:
1679 boxes = _last_boxes.get(cam_name, [])
1680
1681 if len(boxes) > 3:
1682 print(f'[Auto Calibrate] {cam_name}: skipping --- too many detections ({len(boxes)})')
1683 continue
1684
1685 new_rois = auto_caliberate_rois(cam_name, snap_disp, CALIB_PARAMS[cam_name], boxes)
1686
1687 if new_rois:
1688 rebuild_spot_masks(cam_name, new_rois)
1689 print(f'[Auto Calibrate] {cam_name}: updated')
1690
1691 except Exception as e:
1692 print(f'[Auto Calibrate] {cam_name} error: {e}')
1693

◆ blend_boundaries()

list[float] find_distance.blend_boundaries ( list[float] detected,
list[float] manual,
float manual_weight = 0.15 )

Definition at line 1421 of file find_distance.py.

1421def blend_boundaries(detected: list[float], manual: list[float], manual_weight: float = 0.15) -> list[float]:
1422 if not manual or len(detected) != len(manual):
1423 return detected
1424 result = []
1425 for d, m in zip(detected, manual):
1426 dn = d[0] if isinstance(d, tuple) else d
1427 df = d[1] if isinstance(d, tuple) else d
1428 mn = m[0] if isinstance(m, tuple) else m
1429
1430 angle_offset = df - dn
1431 blended_near = dn * (1 - manual_weight) + mn * manual_weight
1432 result.append((blended_near, blended_near + angle_offset))
1433 return result
1434

◆ capture_worker()

find_distance.capture_worker ( int cam_id,
int src,
str raw_shm_name,
raw_lock,
raw_frame_id,
frame_ready_event,
stop_event )

Camera capture worker — runs as a separate Process per camera.

Opens the video source (V4L2 device or file), configures resolution and FPS, then loops reading frames and writing them into the shared memory buffer. After each write it increments raw_frame_id and sets frame_ready_event to immediately wake the inference thread without polling.

For live cameras, YUYV format and a buffer size of 1 are set to always deliver the most recent frame. For file sources, the video rewinds at EOF. The process ignores SIGINT so Ctrl+C is handled only by the main process via STOP_EVENT.

Pinned to CPU core (cam_id + 1) by the main process after spawning.

Parameters
cam_idZero-based camera index.
srcV4L2 device index (int) or video file path (str).
raw_shm_nameName of the shared memory block to write frames into.
raw_lockMultiprocessing Lock protecting the shared buffer.
raw_frame_idShared Value('i') incremented on each new frame.
frame_ready_eventMultiprocessing Event set after each frame write.
stop_eventMultiprocessing Event polled to trigger shutdown.

Definition at line 1065 of file find_distance.py.

1065def capture_worker(cam_id: int, src: int, raw_shm_name: str, raw_lock, raw_frame_id, frame_ready_event, stop_event):
1066 signal.signal(signal.SIGINT, signal.SIG_IGN)
1067 os.environ['OPENBLAS_NUM_THREADS'] = '2'
1068 os.environ['MALLOC_TRIM_THRESHOLD_'] = '100000'
1069
1070 # cap = cv2.VideoCapture(src, cv2.CAP_V4L2)
1071 for attempt in range(5):
1072 cap = cv2.VideoCapture(src)
1073 if cap.isOpened():
1074 break
1075 time.sleep(0.2)
1076
1077 if not cap.isOpened():
1078 print(f"[CAM {cam_id}] Failed to open source {src}")
1079 return
1080
1081 is_live = isinstance(src, int)
1082
1083 if is_live:
1084 cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'YUYV'))
1085 cap.set(cv2.CAP_PROP_FPS, CAPTURE_FPS)
1086 cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # only keep most recent frame
1087
1088 cap.set(cv2.CAP_PROP_FRAME_WIDTH, CAP_W)
1089 cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CAP_H)
1090
1091 actual_fps = cap.get(cv2.CAP_PROP_FPS)
1092 if actual_fps <= 0:
1093 actual_fps = CAPTURE_FPS
1094
1095 frame_delay = 1.0 / actual_fps
1096 print(f"[cam {cam_id}] Video FPS detected: {actual_fps}, frame_delay: {frame_delay:.4f}s")
1097
1098 print(f'[cam {cam_id}] {int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x'
1099 f'{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))} '
1100 f'@ {cap.get(cv2.CAP_PROP_FPS):.0f} FPS')
1101
1102 shm = SharedMemory(name=raw_shm_name)
1103 buf = shm_ndarray(shm, CAP_SHAPE)
1104
1105 local_id = 0
1106 last_frame_time = time.perf_counter()
1107
1108 while not stop_event.is_set():
1109
1110 now = time.perf_counter()
1111 elapsed = now - last_frame_time
1112 sleep_time = frame_delay - elapsed
1113 if sleep_time > 0.001:
1114 time.sleep(sleep_time)
1115 last_frame_time = time.perf_counter()
1116
1117 ok, frame = cap.read()
1118 if not ok:
1119 if not is_live:
1120 cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
1121 last_frame_time = time.perf_counter()
1122 continue
1123 print(f"[cam {cam_id}] read failed")
1124 break
1125
1126 if frame.shape[:2] != (CAP_H, CAP_W):
1127 frame = cv2.resize(frame, (CAP_W, CAP_H), interpolation=cv2.INTER_NEAREST)
1128
1129 with raw_lock:
1130 np.copyto(buf, frame)
1131
1132 local_id += 1
1133 raw_frame_id.value = local_id
1134
1135 frame_ready_event.set() # Wake inference thread immediately to work on the frame
1136
1137 if is_live:
1138 drift = time.perf_counter() - last_frame_time
1139 if drift > frame_delay * 2:
1140 last_frame_time = time.perf_counter()
1141
1142 cap.release()
1143 shm.close()
1144

◆ check_parking_spots()

find_distance.check_parking_spots ( str cam_name,
list disp_boxes )

Determine occupancy state for each parking spot ROI on a given camera.

For every bounding box received from the inference thread, a boolean pixel mask is created and compared against each pre-rasterised spot mask using bitwise AND. A spot is marked 'Car' if the overlap pixel count exceeds INTERSECT_ALLOWANCE multiplied by the total spot mask area. Each car is assigned to the single spot it overlaps the most, preventing one large vehicle from claiming multiple spots.

To avoid flickering when a car is momentarily missed by the detector, a spot only transitions back to 'Empty' after EMPTY_CONFIRM_FRAMES consecutive frames with no detection, tracked via _empty_streak.

Parameters
cam_nameCamera identifier string, e.g. "Left" or "Right".
disp_boxesList of [x1, y1, x2, y2] bounding boxes in display pixel coordinates.
    Spot is occupied if bounding box overlaps 10% of its ROI

Definition at line 362 of file find_distance.py.

362def check_parking_spots(cam_name: str, disp_boxes: list):
363 """
364 Spot is occupied if bounding box overlaps 10% of its ROI
365 """
366
367 masks = SPOT_MASK.get(cam_name, [])
368 min_h = MIN_BOX_H.get(cam_name, 0)
369 if not masks:
370 return
371
372 if len(_empty_streak[cam_name]) != len(masks):
373 _empty_streak[cam_name] = [0] * len(masks)
374
375 car_pixels = []
376 # occupied_pixels = np.zeros((DISP_H,DISP_W), dtype=bool)
377
378 for box in disp_boxes:
379 x1, y1, x2, y2 = box
380
381 if (y2 - y1) < min_h:
382 continue
383
384 x1, y1 = max(0, int(x1)), max(0, int(y1))
385 x2, y2 = min(DISP_W, int(x2)), min(DISP_H, int(y2))
386 if x2 > x1 and y2 > y1:
387 m = np.zeros((DISP_H, DISP_W), dtype=bool)
388 m[y1:y2, x1:x2] = True
389 car_pixels.append(m)
390
391 spot_claimed_by = {}
392 for car_mask in car_pixels:
393 best_spot = -1
394 best_overlap = 0
395 for idx, mask in enumerate(masks):
396 overlap = np.count_nonzero(car_mask & mask)
397 if overlap > best_overlap:
398 best_overlap = overlap
399 best_spot = idx
400 if best_spot >= 0 and best_overlap >= masks[best_spot].sum() * INTERSECT_ALLOWANCE:
401 if best_spot not in spot_claimed_by or best_overlap > spot_claimed_by[best_spot]:
402 spot_claimed_by[best_spot] = best_overlap
403
404 with store_lock:
405 current_states = list(spot_states.get(cam_name, ['Empty'] * len(masks)))
406 new_states = list(current_states)
407
408 for idx in range(len(masks)):
409 if idx in spot_claimed_by:
410 new_states[idx] = 'Car'
411 _empty_streak[cam_name][idx] = 0
412 else:
413 _empty_streak[cam_name][idx] += 1
414 if _empty_streak[cam_name][idx] >= EMPTY_CONFIRM_FRAMES:
415 new_states[idx] = 'Empty'
416 spot_states[cam_name] = new_states
417

◆ detect_spot_boundaries()

tuple[list[float], int] find_distance.detect_spot_boundaries ( np.ndarray frame,
int near_y,
int far_y,
int left_x,
int right_x,
int n_spots,
list detections = None,
str cam_name = None )

Detect parking spot divider X positions from lane markings in a camera frame.

Pipeline:

  1. Thresholds the greyscale frame to isolate bright white lane markings.
  2. Applies morphological closing to fill small gaps in the markings.
  3. Runs Canny edge detection and masks to the parking area band.
  4. Uses Probabilistic Hough Transform to find line segments.
  5. Filters for near-vertical lines (45°–135°) and projects their X intercepts to near_y and far_y to produce (near_x, far_x) boundary pairs.
  6. Clusters nearby dividers (within 20px) and merges narrow gaps.
  7. If the detected count matches n_spots+1, returns those boundaries. If too many are found, picks the n_spots best-matching ones. If too few, falls back to _try_from_cars().

When CALIB_DEBUG is set, saves an annotated debug frame to _debug_frames.

Parameters
frameBGR display-resolution frame to analyse.
near_yY pixel of the near edge of the parking area.
far_yY pixel of the far edge of the parking area.
left_xLeft boundary X pixel.
right_xRight boundary X pixel.
n_spotsExpected number of parking spots.
detectionsOptional list of [x1,y1,x2,y2] detection boxes for fallback.
cam_nameCamera name used for debug frame storage.
Returns
Tuple of (boundaries, n_spots) where boundaries is a list of (near_x, far_x) tuples or plain floats, and n_spots is the detected spot count.

Definition at line 1748 of file find_distance.py.

1749 detections: list = None, cam_name: str = None) -> tuple[list[float], int]:
1750 uniform = [left_x + (right_x - left_x) * i / n_spots for i in range(n_spots + 1)]
1751 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
1752 _, white_mask = cv2.threshold(gray, 160, 255, cv2.THRESH_BINARY)
1753 kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
1754 white_mask = cv2.morphologyEx(white_mask, cv2.MORPH_CLOSE, kernel)
1755 # blur = cv2.GaussianBlur(gray, (3,3), 0)
1756 edges = cv2.Canny(white_mask, 50, 150)
1757
1758 roi_mask = np.zeros_like(edges)
1759 band_top = max(0, far_y - 10)
1760 roi_mask[band_top:near_y + 10, left_x:right_x] = 255
1761 edges = cv2.bitwise_and(edges, roi_mask)
1762
1763 lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=20, minLineLength=25, maxLineGap=30)
1764 divider_xs = []
1765 for l in lines:
1766 x1, y1, x2, y2 = l[0]
1767 angle = abs(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
1768 if 45 < angle < 135:
1769
1770 if y2 != y1:
1771 t = (near_y - y1) / (y2 - y1)
1772 x_at_near = x1 + t * (x2 - x1)
1773 t_far = (far_y - y1) / (y2 - y1)
1774 x_far = x1 + t_far * (x2 - x1)
1775 else:
1776 x_at_near = x_far = (x1 + x2) / 2
1777 if left_x <= x_at_near <= right_x:
1778 divider_xs.append((x_at_near, x_far))
1779 if os.environ.get('CALIB_DEBUG'):
1780 dbg = frame.copy()
1781 if lines is not None:
1782 for l in lines:
1783 x1, y1, x2, y2 = l[0]
1784 angle = abs(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
1785 if 45 < angle < 135:
1786 cv2.line(dbg, (x1, y1), (x2, y2), (255, 0, 0), 1)
1787 for xn, xf in divider_xs:
1788 cv2.line(dbg, (int(xn), near_y), (int(xf), far_y), (0, 255, 0), 2)
1789 cv2.line(dbg, (left_x, near_y), (right_x, near_y), (0, 200, 255), 1)
1790 cv2.line(dbg, (left_x, far_y), (right_x, far_y), (0, 200, 255), 1)
1791 if cam_name:
1792 _debug_frames[cam_name] = dbg
1793
1794 if lines is None or len(divider_xs) < 2:
1795 return _try_from_cars(detections, near_y, left_x, right_x, n_spots, uniform)
1796
1797 divider_xs.sort(key=lambda d: d[0])
1798 clustered = []
1799 group = [divider_xs[0]]
1800 for x in divider_xs[1:]:
1801 if x[0] - group[0][0] < 20:
1802 group.append(x)
1803 else:
1804 clustered.append((np.mean([g[0] for g in group]), np.mean([g[1] for g in group])))
1805 group = [x]
1806 clustered.append((np.mean([g[0] for g in group]), np.mean([g[1] for g in group])))
1807
1808 xs_near = [left_x] + [c[0] for c in clustered] + [right_x]
1809 xs_far = [left_x] + [c[1] for c in clustered] + [right_x]
1810 all_bounds = sorted(zip(xs_near, xs_far), key=lambda p: p[0])
1811 xs_near = [p[0] for p in all_bounds]
1812 xs_far = [p[1] for p in all_bounds]
1813
1814 paired = list(zip(xs_near, xs_far))
1815 merged_pairs = [paired[0]]
1816 for i in range(1, len(paired)):
1817 width = paired[i][0] - merged_pairs[-1][0]
1818 if width >= 80.0 or len(merged_pairs) == 1:
1819 merged_pairs.append(paired[i])
1820 else:
1821 merged_pairs[-1] = paired[i]
1822 xs_far = [p[1] for p in merged_pairs]
1823 xs_near = [p[0] for p in merged_pairs]
1824 if len(xs_near) == n_spots + 1:
1825 return list(zip(xs_near, xs_far)), len(xs_near) - 1
1826
1827 if len(xs_near) > n_spots + 1:
1828 expected = (right_x - left_x) / n_spots
1829 best_near, best_far = [left_x], [left_x]
1830 interior = list(zip(xs_near[1:-1], xs_far[1:-1]))
1831 for i in range(1, n_spots):
1832 target = left_x + expected * i
1833 closest = min(interior, key=lambda p: abs(p[0] - target))
1834 best_near.append(closest[0])
1835 best_far.append(closest[1])
1836 best_near.append(right_x)
1837 best_far.append(right_x)
1838 return list(zip(best_near, best_far)), len(best_near) - 1
1839
1840 return _try_from_cars(detections, near_y, left_x, right_x, n_spots, uniform)
1841

◆ display_loop()

find_distance.display_loop ( stop_event)

Display loop — renders the live camera feed with overlays in a GUI window.

Runs in the main thread (or a dedicated thread) when -T / –test is passed. Pinned to CPU core 5. Creates a single OpenCV window and stitches all camera panels side by side each frame. For each panel it:

  • Copies the latest raw frame from shared memory.
  • Calls draw_spot_overlays() to blend ROI polygons.
  • Draws cached detection boxes and labels from _last_ann_boxes.
  • Overlays the camera name and occupancy count.

After stitching, roi_editor.draw_overlay() is called to paint the ROI editor UI if it is active. An optional CALIB_DEBUG window shows edge detection output.

Keyboard bindings (in addition to ROI editor keys):

  • U: Trigger a manual auto-calibration pass for all cameras.
  • C: Toggle the CalibWindow trackbar panel for the active camera.
  • Q: Set STOP_EVENT and exit.
Parameters
stop_eventMultiprocessing Event polled to exit the loop.
Displays what the cams see.
Press I to toggle the interactive ROI editor.

Definition at line 1290 of file find_distance.py.

1290def display_loop(stop_event):
1291 """
1292 Displays what the cams see.
1293 Press I to toggle the interactive ROI editor.
1294 """
1295
1296 try:
1297 os.sched_setaffinity(0, {5})
1298 except Exception:
1299 pass
1300
1301 win = 'Parking Finder'
1302 cv2.namedWindow(win, cv2.WINDOW_NORMAL)
1303 cv2.resizeWindow(win, DISP_W * len(DISPLAY_ORDER), DISP_H)
1304 cv2.setMouseCallback(win, roi_editor.mouse_cb)
1305
1306 frame_time = 1.0 / 30
1307 last_time = 0.0
1308
1309 while not stop_event.is_set():
1310 now = time.time()
1311 if now - last_time < frame_time:
1312 time.sleep(0.005)
1313 continue
1314
1315 panels = []
1316 for i in DISPLAY_ORDER:
1317 cam_name = CAM_ORDER[i]
1318 with RAW_LOCKS[i]:
1319 panel = RAW_BUFS[i].copy()
1320
1321 if panel.shape[:2] != (DISP_H, DISP_W):
1322 panel = cv2.resize(panel, (DISP_W, DISP_H), interpolation=cv2.INTER_NEAREST)
1323
1324 if cam_name in SPOT_CAMS:
1325 panel = draw_spot_overlays(panel, cam_name)
1326
1327 with store_lock:
1328 ann_boxes = list(_last_ann_boxes.get(cam_name, []))
1329
1330 min_h = MIN_BOX_H.get(cam_name, 0)
1331 for box, score, name in ann_boxes:
1332 x1, y1, x2, y2 = box
1333 if (y2 - y1) < min_h:
1334 continue
1335 x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
1336 color = (0, 150, 150)
1337 cv2.rectangle(panel, (x1, y1), (x2, y2), color, 2)
1338 label = f'{name} {score:.2f}'
1339 (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_COMPLEX, 0.5, 1)
1340 cv2.rectangle(panel, (x1, y1 - th - 8), (x1 + tw + 6, y1), color, -1)
1341 cv2.putText(panel, label, (x1 + 3, y1 - 4), cv2.FONT_HERSHEY_COMPLEX, 0.5, (15, 15, 15), 1, cv2.LINE_AA)
1342
1343 cv2.putText(panel, cam_name, (10, 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 230, 200), 2, cv2.LINE_AA)
1344 if cam_name in SPOT_CAMS:
1345 with store_lock:
1346 states = spot_states.get(cam_name, [])
1347 occ = sum(1 for s in states if s == 'Car')
1348 cv2.putText(panel, f'{occ}/{len(states)} occupied', (10, 56), cv2.FONT_HERSHEY_SIMPLEX, 0.6,
1349 (0, 230, 200), 1, cv2.LINE_AA)
1350
1351 panels.append(panel)
1352
1353 # Stitch panels side by side
1354 canvas = np.hstack(panels)
1355
1356 roi_editor.draw_overlay(canvas)
1357
1358 cv2.imshow(win, canvas)
1359
1360 if os.environ.get('CALIB_DEBUG') and _debug_frames:
1361 debug_panels = []
1362 for cam in CAM_ORDER:
1363 p = _debug_frames.get(cam)
1364 if p is None:
1365 p = np.zeros((DISP_H, DISP_W, 3), dtype=np.uint8)
1366 if p.shape[:2] != (DISP_H, DISP_W):
1367 p = cv2.resize(p, (DISP_W, DISP_H))
1368 cv2.putText(p, f'DEBUG: {cam}', (10, 25),
1369 cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
1370 debug_panels.append(p)
1371 cv2.imshow('boundary_debug', np.hstack(debug_panels))
1372
1373 key = cv2.waitKey(1) & 0xFF
1374
1375 if roi_editor.handle_key(key):
1376 continue
1377
1378 if key == ord('u') or key == ord('U'):
1379 for active_idx in range(len(SOURCES)):
1380 cam_name = CAM_ORDER[active_idx]
1381 with RAW_LOCKS[active_idx]:
1382 snap = RAW_BUFS[active_idx].copy()
1383 snap_disp = cv2.resize(snap, (DISP_W, DISP_H), interpolation=cv2.INTER_NEAREST)
1384 with store_lock:
1385 boxes = _last_boxes.get(cam_name, [])
1386 if len(boxes) > 2:
1387 print(f"\033[1;93mWarning: [Auto Caliberation] {cam_name} has {len(boxes)} detections")
1388 new_rois = auto_caliberate_rois(cam_name, snap_disp, CALIB_PARAMS[cam_name], boxes)
1389 if new_rois:
1390 rebuild_spot_masks(cam_name, new_rois)
1391 continue
1392
1393 if key == ord('c') or key == ord('C'):
1394 calib_window.toggle(roi_editor.active_cam)
1395 continue
1396 if key == ord('q'):
1397 stop_event.set()
1398 break
1399
1400 calib_window.tick()
1401 last_time = now
1402
1403 calib_window.close()
1404 cv2.destroyAllWindows()
1405
1406# @brief Blend detected spot boundaries with manually set ones from the CalibWindow.
1407#
1408# Performs a weighted average between auto-detected and manually specified boundary
1409# X positions. The manual weight is kept intentionally small (default 0.15) so the
1410# auto-detection result dominates while the manual input provides a gentle stabilising
1411# nudge. The perspective angle offset (far_x - near_x) of the detected boundary is
1412# preserved through the blend.
1413#
1414# Returns detected unchanged if manual is empty or the lists differ in length.
1415#
1416# @param detected List of detected boundary positions. Each entry is either
1417# a float (uniform) or a (near_x, far_x) tuple.
1418# @param manual List of manually specified boundary X positions (floats).
1419# @param manual_weight Blend weight for the manual values (0.0–1.0, default 0.15).
1420# @return Blended boundary list as (near_x, far_x) tuples.

◆ draw_spot_overlays()

np.ndarray find_distance.draw_spot_overlays ( np.ndarray frame,
str cam_name )

Blend semi transparent ROI polygon overlays onto a camera frame.

For each defined parking spot, a filled polygon is drawn onto a copy of the frame using the occupancy-dependent colour (OCCUPIED_COLOR or EMPTY_COLOR), then blended back onto the original using cv2.addWeighted with SPOT_ALPHA opacity. A spot ID and state label is drawn at the polygon centroid.

Parameters
frameBGR uint8 NumPy array to draw onto. Not modified in-place; a blended copy is returned.
cam_nameCamera identifier used to look up ROIS and spot_states.
Returns
New BGR frame with spot overlays blended in.
    Blend ROI polygons into the frame

Definition at line 460 of file find_distance.py.

460def draw_spot_overlays(frame: np.ndarray, cam_name: str) -> np.ndarray:
461 """
462 Blend ROI polygons into the frame
463 """
464
465 spots = ROIS.get(cam_name, [])
466 states = spot_states.get(cam_name, ['Empty'] * len(spots))
467 overlay = frame.copy()
468
469 for spot, state in zip(spots, states):
470 pts = np.array(spot['poly'], dtype=np.int32)
471 color = OCCUPIED_COLOR if state == 'Car' else EMPTY_COLOR
472 border = (255, 80, 80) if state == 'Car' else (80, 255, 130)
473
474 cv2.fillPoly(overlay, [pts], color)
475 cv2.polylines(overlay, [pts], isClosed=True, color=border, thickness=2)
476
477 cx = int(np.mean([p[0] for p in spot['poly']]))
478 cy = int(np.mean([p[1] for p in spot['poly']]))
479 label = f"{spot['id']}: {state}"
480 (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.45, 1)
481
482 cv2.rectangle(overlay,
483 (cx - tw // 2 - 3, cy - th - 6),
484 (cx + tw // 2 + 3, cy + 4),
485 (20, 20, 20), -1)
486
487 cv2.putText(overlay, label, (cx - tw // 2, cy), cv2.FONT_HERSHEY_SIMPLEX, 0.45, border, 1, cv2.LINE_AA)
488
489 return cv2.addWeighted(overlay, SPOT_ALPHA, frame, 1 - SPOT_ALPHA, 0)
490

◆ generate_rois()

list[dict] find_distance.generate_rois ( str cam_id,
tuple vanishing_point,
int near_y,
int far_y,
int left_x,
int right_x,
int n_spots = 5,
int n_rows = 1,
float fisheye_distortion = 0.0,
float perspective_strength = 1.0,
str name = None,
list spot_boundaries = None,
** kwargs )

Procedurally generate perspective correct parking spot ROI polygons.

Computes ROI polygons that match real-world lane markings under camera perspective distortion. The algorithm works by interpolating horizontal boundary positions between a near Y row and a far Y row using a vanishing point to model convergence, then optionally applying a radial fisheye correction. Three internal helpers handle the geometry:

horizontal_bounds(y): Returns the left/right X extents at a given Y row, blending between a rectangular layout and full-perspective convergence toward the vanishing point using perspective_strength.

scale_x_to_y(x_at_near, y): Projects a single X position from the near row to any other Y row along the perspective gradient.

fisheye(x, y): Applies barrel/pincushion radial distortion correction. A positive fisheye_distortion value corrects barrel distortion (wide-angle lenses); negative corrects pincushion.

If spot_boundaries is provided as a list of (near_x, far_x) tuples, those explicit positions are used instead of uniform division.

Parameters
cam_idCamera identifier string, used as spot ID prefix.
vanishing_point(x, y) pixel coordinate of the perspective vanishing point.
near_yY pixel of the near (bottom) edge of the parking area.
far_yY pixel of the far (top) edge of the parking area.
left_xLeft boundary X pixel of the parking area.
right_xRight boundary X pixel of the parking area.
n_spotsNumber of parking spots per row (default 5).
n_rowsNumber of rows of spots (default 1).
fisheye_distortionRadial distortion coefficient. 0.0 disables correction.
perspective_strengthBlend between rect (0.0) and full-perspective (1.0) layout.
nameOptional spot ID prefix override (defaults to cam_id[0]).
spot_boundariesOptional pre-computed boundary list. Length must be n_spots+1.
Returns
List of dicts, each with keys 'id' (str) and 'poly' (list of (x,y) int tuples).

Definition at line 526 of file find_distance.py.

540) -> list[dict]:
541 vpx, vpy = vanishing_point
542 W, H = DISP_H, DISP_W
543 cx, cy = W / 2, H / 2
544 prefix = name or cam_id[0].upper()
545
546 def horizontal_bounds(y):
547
548 denominator = vpy - near_y
549 if denominator == 0:
550 denominator = far_y - near_y
551 if denominator == 0:
552 return left_x, right_x
553 t = (y - near_y) / denominator
554 t = max(0.0, min(t, 1.0))
555 xl_perspective = left_x + (vpx - left_x) * t
556 xr_perspective = right_x + (vpx - right_x) * t
557 xl_rect = left_x
558 xr_rect = right_x
559
560 xl = xl_rect + (xl_perspective - xl_rect) * perspective_strength
561 xr = xr_rect + (xr_perspective - xr_rect) * perspective_strength
562
563 return xl, xr
564
565 def scale_x_to_y(x_at_near, y):
566 denominator = vpy - near_y
567 if denominator == 0:
568 denominator = far_y - near_y
569 if denominator == 0:
570 return x_at_near
571 t = (y - near_y) / denominator
572 t = max(0.0, min(t, 1.0))
573 return x_at_near + (vpx - x_at_near) * t * perspective_strength
574
575 def fisheye(x, y):
576
577 if fisheye_distortion == 0.0:
578 return x, y
579 nx, ny = (x - cx) / cx, (y - cy) / cy
580 r2 = nx * nx + ny * ny
581 f = 1 + fisheye_distortion * r2
582 return cx + nx * f * cx, cy + ny * f * cy
583
584 rois = []
585 for row in range(n_rows):
586 t_near = row / n_rows
587 t_far = (row + 1) / n_rows
588 row_near_y = near_y + (far_y - near_y) * t_near
589 row_far_y = near_y + (far_y - near_y) * t_far
590
591 for s in range(n_spots):
592 if spot_boundaries and len(spot_boundaries) == n_spots + 1:
593 if isinstance(spot_boundaries[0], tuple):
594 x0_near, x0_far = spot_boundaries[s]
595 x1_near, x1_far = spot_boundaries[s + 1]
596 else:
597 x0_near = scale_x_to_y(spot_boundaries[s], row_near_y)
598 x1_near = scale_x_to_y(spot_boundaries[s + 1], row_near_y)
599 x0_far = scale_x_to_y(spot_boundaries[s], row_far_y)
600 x1_far = scale_x_to_y(spot_boundaries[s + 1], row_far_y)
601 else:
602 near_xl, near_xr = horizontal_bounds(row_near_y)
603 far_xl, far_xr = horizontal_bounds(row_far_y)
604 t0, t1 = s / n_spots, (s + 1) / n_spots
605 x0_near = near_xl + (near_xr - near_xl) * t0
606 x1_near = near_xl + (near_xr - near_xl) * t1
607 x0_far = far_xl + (far_xr - far_xl) * t0
608 x1_far = far_xl + (far_xr - far_xl) * t1
609
610 pts_raw = [
611 (x0_near, row_near_y),
612 (x1_near, row_near_y),
613 (x1_far, row_far_y),
614 (x0_far, row_far_y),
615 ]
616
617 poly = [
618 (int(round(fx)), int(round(fy)))
619 for x, y in pts_raw
620 for fx, fy in [fisheye(x, y)]
621 ]
622 rois.append({'id': f'{prefix}{row * n_spots + s + 1}', 'poly': poly})
623 return rois
624

◆ get_detections()

find_distance.get_detections ( )

Flask REST endpoint — returns current parking spot occupancy states.

Called by the front end or any HTTP client to poll occupancy without needing direct access to the process. Acquires store_lock briefly to get a consistent snapshot of spot_states.

Example response
{
"timestamp": "14:32:01",
"spots": {
"Left": ["Empty", "Car", "Empty", "Empty", "Car", "Empty"],
"Right": ["Car", "Car", "Empty", "Empty", "Car"]
}
}
Returns
Flask JSON response with HTTP 200.

Definition at line 340 of file find_distance.py.

340def get_detections():
341 with store_lock:
342 data = {
343 "timestamp": datetime.now().strftime('%H:%M:%S'),
344 'spots': {cam: list(states) for cam, states in spot_states.items()},
345 }
346 return jsonify(data)
347

◆ inference_loop()

find_distance.inference_loop ( bool need_annotations,
frame_ready_event,
stop_event )

YOLO inference thread — batches frames from all cameras and runs detection.

Runs as a daemon Thread pinned to CPU core 4. On startup it performs 5 warm-up inference passes with dummy frames to prime the TensorRT engine and avoid a latency spike on the first real frame.

Each iteration checks whether new frames are available by comparing raw_frame_id against last_seen. If new frames exist they are resized to IMGSZ, padded to MAX_BATCH with blank frames, and passed to the YOLO model in a single batched call. Detections are scaled back to display resolution using sx/sy factors.

Results are processed per camera:

  • check_parking_spots() updates occupancy state.
  • _last_boxes and _last_ann_boxes are updated under store_lock.
  • If need_annotations is True, annotated frames are written to ANN_BUFS.

Sleeps briefly and waits on frame_ready_event between inference cycles to avoid busy-waiting while respecting the INFERENCEFPS cap.

Parameters
need_annotationsIf True, annotated frames are rendered and written to ANN_BUFS.
frame_ready_eventEvent set by capture workers when a new frame is available.
stop_eventEvent polled to trigger shutdown.

Definition at line 1167 of file find_distance.py.

1167def inference_loop(need_annotations: bool, frame_ready_event, stop_event):
1168 try:
1169 os.sched_setaffinity(0, {4})
1170 except Exception:
1171 pass
1172
1173 model = YOLO(MODEL_PATH, task='detect')
1174 if not MODEL_PATH.endswith('.engine'):
1175 model.fuse()
1176
1177 dummy = np.zeros((IMGSZ, IMGSZ, 3), dtype=np.uint8)
1178 # model.predict([dummy] * MAX_BATCH, imgsz=IMGSZ, conf=CONF,device='cuda', verbose=False, half=True)
1179 for _ in range(5):
1180 model.predict([dummy] * MAX_BATCH, imgsz=IMGSZ, conf=CONF, device='cuda', verbose=False, half=True)
1181 # model.predict([dummy] * MAX_BATCH, imgsz=IMGSZ, conf=CONF, verbose=False, half=True)
1182
1183 print('[Inference] warm-up done')
1184
1185 sx = DISP_W / IMGSZ
1186 sy = DISP_H / IMGSZ
1187
1188 last_seen = [0] * 2
1189 frame_time = 1.0 / INFERENCEFPS
1190 last_time = 0.0
1191
1192 while not stop_event.is_set():
1193 now = time.time()
1194
1195 if now - last_time < frame_time:
1196 frame_ready_event.wait(timeout=0.05)
1197 frame_ready_event.clear()
1198 continue
1199
1200 frames = []
1201 raw_full_list = []
1202 valid_indices = []
1203
1204 for i in INF_IDX:
1205 fid = RAW_FRAME_ID[i].value
1206
1207 if fid == 0 or fid == last_seen[i]:
1208 continue
1209
1210 with RAW_LOCKS[i]:
1211 raw_full = RAW_BUFS[i].copy()
1212
1213 raw_full_list.append(raw_full)
1214
1215 frames.append(cv2.resize(raw_full, (IMGSZ, IMGSZ), interpolation=cv2.INTER_NEAREST))
1216 valid_indices.append(i)
1217
1218 if not frames:
1219 time.sleep(0.005)
1220 continue
1221
1222 # padded = frames
1223 padded = list(frames)
1224 while len(padded) < MAX_BATCH:
1225 padded.append(np.zeros((IMGSZ, IMGSZ, 3), dtype=np.uint8))
1226
1227 try:
1228 results = model.predict(padded, imgsz=IMGSZ, conf=CONF, device='cuda', classes=CLASSES, verbose=False,
1229 half=True)
1230
1231 # results = model.predict(padded, imgsz=IMGSZ, conf=CONF, classes=CLASSES, verbose=False,
1232 # half=True)
1233
1234 for result, i, raw_full in zip(results, valid_indices, raw_full_list):
1235 cam_name = CAM_ORDER[i]
1236
1237 disp_boxes = [
1238 [x1 * sx, y1 * sy, x2 * sx, y2 * sy]
1239 for x1, y1, x2, y2 in result.boxes.xyxy.tolist()
1240 ]
1241
1242 if cam_name in SPOT_CAMS:
1243 check_parking_spots(cam_name, disp_boxes)
1244 with store_lock:
1245 _last_boxes[cam_name] = list(disp_boxes)
1246
1247 scores = result.boxes.conf.tolist()
1248 names = [model.names[int(c)] for c in result.boxes.cls]
1249 with store_lock:
1250 _last_ann_boxes[cam_name] = list(zip(disp_boxes, scores, names))
1251
1252 if need_annotations:
1253
1254 disp = raw_full.copy()
1255 annotate_frame(disp, disp_boxes, scores, names, cam_name)
1256
1257 if cam_name in SPOT_CAMS:
1258 disp = draw_spot_overlays(disp, cam_name)
1259
1260 with ANN_LOCKS[i]:
1261 np.copyto(ANN_BUFS[i], disp)
1262
1263 last_seen[i] = RAW_FRAME_ID[i].value
1264
1265 except Exception:
1266 traceback.print_exc()
1267 time.sleep(0.1)
1268
1269 last_time = now
1270

◆ merge_narrow_spots()

list[float] find_distance.merge_narrow_spots ( list[float] boundaries,
float min_width = 80.0 )

Remove spot boundaries that would produce slots narrower than min_width pixels.

Iterates through the boundary list and skips any position that would create a slot width below min_width relative to the previous kept boundary. This prevents the ROI generator from producing tiny sliver polygons when line detection finds spurious closely-spaced dividers. The last boundary is always forced to match the original final value to preserve the overall parking area extent.

Parameters
boundariesList of X boundary positions (floats), including left and right edges.
min_widthMinimum acceptable slot width in pixels (default 80.0).
Returns
Filtered boundary list with narrow gaps merged away.

Definition at line 1705 of file find_distance.py.

1705def merge_narrow_spots(boundaries: list[float], min_width: float = 80.0) -> list[float]:
1706 if len(boundaries) < 2:
1707 return boundaries
1708
1709 merged = [boundaries[0]]
1710 for i in range(1, len(boundaries)):
1711 width = boundaries[i] - merged[-1]
1712 if width < min_width and len(merged) > 1:
1713 continue
1714 merged.append(boundaries[i])
1715
1716 if merged[-1] != boundaries[-1]:
1717 merged[-1] = boundaries[-1]
1718
1719 return merged
1720

◆ parse_args()

find_distance.parse_args ( )

Parse command-line arguments.

Supports three flags that can be combined freely:

  • -t / -T / –test Open the display window for visual testing.
  • -r / -R / –record Record raw (or annotated) frames to disk.
  • -a / -A / –annotate Burn annotations into the recording (requires -r).
Returns
argparse.Namespace with attributes: test (bool), record (bool), annotate (bool).

Definition at line 101 of file find_distance.py.

101def parse_args():
102 p = argparse.ArgumentParser(description="Parking Finder")
103 p.add_argument("-t", "-T", "--test", action="store_true", help="Enable display")
104 p.add_argument("-R", "-r", "--record", action="store_true", help="Record")
105 p.add_argument("-A", "-a", "--annotate", action="store_true", help="Records the annotated version (requires -r)")
106 return p.parse_args()
107

◆ rebuild_spot_masks()

find_distance.rebuild_spot_masks ( str cam_name,
list[dict] new_rois,
bool reset_states = False )

Rebuild the in-memory ROI and mask tables for a camera after calibration.

Re-rasterises each new ROI polygon into a boolean pixel mask and atomically replaces the global ROIS, SPOT_MASK, and spot_states entries under store_lock. If the number of spots has changed, spot_states and _empty_streak are reset to avoid index mismatches.

Parameters
cam_nameCamera identifier string.
new_roisNew list of ROI dicts from generate_rois() or auto_caliberate_rois().
reset_statesIf True, forces spot_states back to all 'Empty' regardless of whether the spot count changed.

Definition at line 799 of file find_distance.py.

799def rebuild_spot_masks(cam_name: str, new_rois: list[dict], reset_states: bool = False):
800 global ROIS, SPOT_MASK, spot_states
801
802 new_masks = []
803 for spot in new_rois:
804 m = np.zeros((DISP_H, DISP_W), dtype=np.uint8)
805 cv2.fillPoly(m, [np.array(spot['poly'], dtype=np.int32)], 1)
806 new_masks.append(m.astype(bool))
807 with store_lock:
808 ROIS[cam_name] = new_rois
809 SPOT_MASK[cam_name] = new_masks
810
811 if reset_states or len(new_rois) != len(spot_states.get(cam_name, [])):
812 spot_states[cam_name] = ['Empty'] * len(new_rois)
813 _empty_streak[cam_name] = [0] * len(new_rois)
814 # else:
815 # _empty_streak[cam_name] = [0] * len(new_rois)
816 print(f"\033[1;91mWarning: [Auto Caliberation] {cam_name}: {len(new_rois)} spots rebuilt\033[0m")
817

◆ record_loop()

find_distance.record_loop ( stop_event,
use_anns = False )

Recording loop — writes camera frames to timestamped MP4 files.

Runs as a daemon Thread when -r / –record is passed. Pinned to CPU core 5 (shared with the display loop, which is absent in headless record mode). Creates one MP4 file per camera in the recordings/ directory, named with a timestamp prefix and the camera name.

When use_anns is True, annotated frames from ANN_BUFS are written instead of raw frames, burning in bounding boxes and ROI overlays permanently. Note that ANN_BUFS are only populated when need_annotations is True, which requires either -T or -r to be passed at startup.

Parameters
stop_eventMultiprocessing Event polled to stop recording and flush files.
use_annsIf True, record annotated frames; otherwise record raw frames. a
    Writes raw cam frames to the output file (Can be edited to write annotated frames)

Definition at line 1595 of file find_distance.py.

1595def record_loop(stop_event, use_anns=False):
1596 """
1597 Writes raw cam frames to the output file (Can be edited to write annotated frames)
1598 """
1599 try:
1600 os.sched_setaffinity(0, {5})
1601 except Exception:
1602 pass
1603
1604 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
1605 os.makedirs('recordings', exist_ok=True)
1606 fourcc = cv2.VideoWriter_fourcc(*'mp4v')
1607 writers = []
1608 suffix = 'Annotated' if use_anns else 'Raw'
1609 for cam_name in CAM_ORDER:
1610 path = f"recordings/{timestamp}_{cam_name}.mp4"
1611 writers.append(cv2.VideoWriter(path, fourcc, CAPTURE_FPS, (CAP_W, CAP_H)))
1612 print(f"[Record] {cam_name} -> {path}")
1613
1614 frame_time = 1.0 / CAPTURE_FPS
1615 last_time = 0.0
1616
1617 if use_anns:
1618 print("\033[1;93mWarning: Recording annotated version\033[0m")
1619 while not stop_event.is_set():
1620 now = time.time()
1621 if now - last_time < frame_time:
1622 time.sleep(0.003)
1623 continue
1624
1625 for i, writer in enumerate(writers):
1626 # change here to ann versions
1627 if use_anns:
1628
1629 with ANN_LOCKS[i]:
1630 writer.write(ANN_BUFS[i])
1631 else:
1632 with RAW_LOCKS[i]:
1633 writer.write(RAW_BUFS[i])
1634 last_time = now
1635
1636 for w in writers:
1637 w.release()
1638 print('[record] files saved')
1639

◆ set_jetson_clocks()

find_distance.set_jetson_clocks ( bool enable)

Lock Jetson hardware clocks at maximum frequency for consistent inference latency.

Stores the current clock state to JETSON_CLOCKS_CONF before locking so it can be restored on shutdown. Should be called at startup (enable=True) and in the atexit handler (enable=False). Prints a coloured warning so the state change is visible in the terminal.

Parameters
enableTrue to lock clocks at maximum; False to restore original state.

Definition at line 81 of file find_distance.py.

81def set_jetson_clocks(enable: bool):
82 try:
83 if enable:
84 subprocess.run(['sudo', 'jetson_clocks', '--store', JETSON_CLOCKS_CONF], check=True)
85 subprocess.run(['sudo', 'jetson_clocks'], check=True)
86 print("\033[1;91mWarning: [SYSTEM] JETSON CLOCKS LOCKED\033[0m")
87 else:
88 subprocess.run(['sudo', 'jetson_clocks', '--restore', JETSON_CLOCKS_CONF], check=True)
89 print("\033[1;91mWarning: [SYSTEM] JETSON CLOCKS RESTORED\033[0m")
90 except Exception as e:
91 print(f"\033[1;91mWarning: [SYSTEM] JETSON CLOCKS FAILED: {e}\033[0m")
92

◆ shm_ndarray()

np.ndarray find_distance.shm_ndarray ( SharedMemory shm,
tuple shape )

Create a NumPy array view backed by a SharedMemory block.

No data is copied — the array directly references the shared memory buffer. All processes sharing the same SharedMemory name will see the same bytes.

Parameters
shmAn open SharedMemory object.
shapeDesired NumPy shape tuple, e.g. (480, 640, 3).
Returns
uint8 NumPy array backed by shm.buf.

Definition at line 314 of file find_distance.py.

314def shm_ndarray(shm: SharedMemory, shape: tuple) -> np.ndarray:
315 return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
316
317

◆ shutdown()

find_distance.shutdown ( )

atexit handler — gracefully shuts down all workers and frees shared memory.

Registered with atexit so it runs automatically on normal exit, KeyboardInterrupt, or unhandled exceptions. Only executes in the main process (guarded by _MAIN_PID) to prevent child capture processes from triggering a double-shutdown.

Sets STOP_EVENT, joins all capture processes with a 3-second timeout, unlinks all shared memory blocks, and restores Jetson clock settings.

Definition at line 1893 of file find_distance.py.

1893def shutdown():
1894 if os.getpid() != _MAIN_PID:
1895 return
1896
1897 STOP_EVENT.set()
1898 print("Shutting down...")
1899
1900 for p in PROCESSES:
1901 try:
1902 p.join(timeout=3)
1903 except Exception:
1904 pass
1905
1906 for shm in RAW_SHM_OBJS + ANN_SHM_OBJS:
1907 try:
1908 shm.close()
1909 shm.unlink()
1910 except Exception:
1911 pass
1912 set_jetson_clocks(False)
1913 print('Done')
1914
1915

Variable Documentation

◆ _

find_distance._
protected

Definition at line 1947 of file find_distance.py.

◆ _debug_frames

dict find_distance._debug_frames = {}
protected

Per camera debug visualisation frames showing detected edges and boundary lines.

Only populated when the CALIB_DEBUG environment variable is set.

Definition at line 300 of file find_distance.py.

◆ _empty_streak

dict find_distance._empty_streak = {cam: [] for cam in CAM_ORDER}
protected

Per camera, per spot counter of consecutive inference frames with no detection.

A spot transitions from 'Car' to 'Empty' only after EMPTY_CONFIRM_FRAMES consecutive empty frames, preventing flickering from momentary missed detections.

Definition at line 292 of file find_distance.py.

◆ _last_ann_boxes

dict find_distance._last_ann_boxes = {cam: [] for cam in CAM_ORDER}
protected

Per camera cache of (box, score, name) tuples from the last inference pass.

Used by the display loop to draw labels without re-running inference.

Definition at line 287 of file find_distance.py.

◆ _last_boxes

dict find_distance._last_boxes = {cam: [] for cam in CAM_ORDER}
protected

Per camera cache of the most recent raw bounding boxes (display coordinates).

Written by the inference thread, read by auto-calibration and the display loop.

Definition at line 283 of file find_distance.py.

◆ _MAIN_PID

find_distance._MAIN_PID = 0
protected

PID of the main process.

Used in the atexit handler to avoid running shutdown logic in child processes.

Definition at line 275 of file find_distance.py.

◆ _manual_bounds

dict find_distance._manual_bounds = {cam: [] for cam in CAM_ORDER}
protected

Per camera list of manually set spot boundary X positions from the CalibWindow.

Blended into auto-calibrated boundaries with a small weight to stabilise results.

Definition at line 296 of file find_distance.py.

◆ ann

find_distance.ann = SharedMemory(create=True, size=DISP_H * DISP_W * 3)

Definition at line 1954 of file find_distance.py.

◆ ANN_BUFS

list find_distance.ANN_BUFS = []

NumPy arrays mapped onto ANN_SHM_OBJS for zero copy annotated frame access.

Definition at line 258 of file find_distance.py.

◆ ANN_LOCKS

list find_distance.ANN_LOCKS = []

Per camera multiprocessing Locks protecting ANN_BUFS entries.

Definition at line 261 of file find_distance.py.

◆ ANN_SHM_OBJS

list find_distance.ANN_SHM_OBJS = []

SharedMemory objects holding annotated frames written by the inference thread.

Definition at line 252 of file find_distance.py.

◆ app

find_distance.app = Flask(__name__)

Definition at line 318 of file find_distance.py.

◆ args

find_distance.args = parse_args()

Definition at line 1917 of file find_distance.py.

◆ AUTO_CALIBRATE_INTERVAL

int find_distance.AUTO_CALIBRATE_INTERVAL = 0

Seconds between automatic ROI recalibration passes.

Set to 0 to disable.

Definition at line 182 of file find_distance.py.

◆ auto_calibrate_t

find_distance.auto_calibrate_t
Initial value:
1= Thread(
2 target=auto_calibrate_loop,
3 args=(STOP_EVENT,),
4 daemon=True,
5 )

Definition at line 1995 of file find_distance.py.

◆ benchmark

find_distance.benchmark

Definition at line 68 of file find_distance.py.

◆ CALIB_PARAMS

dict find_distance.CALIB_PARAMS
Initial value:
1= {
2 'Left': {'vanishing_point': (381, 214), 'near_y': 406, 'far_y': 274,
3 'left_x': 0, 'right_x': 640, 'n_spots': 6, 'n_rows': 1, 'fisheye_distortion': 0.0,
4 'perspective_strength': 0.16, 'min_roi_height': 63},
5 'Right': {'vanishing_point': (277, 223), 'near_y': 451, 'far_y': 337,
6 'left_x': 0, 'right_x': 640, 'n_spots': 5, 'n_rows': 1, 'fisheye_distortion': 0.03,
7 'perspective_strength': 0.32, 'min_roi_height': 70},
8}

Per camera calibration parameters used by generate_rois() and auto_caliberate_rois().

Keys per camera:

  • vanishing_point (tuple): (x, y) pixel of the perspective vanishing point.
  • near_y (int): Y pixel of the near (bottom) edge of the parking area.
  • far_y (int): Y pixel of the far (top) edge of the parking area.
  • left_x / right_x (int): Horizontal bounds of the parking area.
  • n_spots (int): Expected number of parking spots per row.
  • n_rows (int): Number of rows of spots.
  • fisheye_distortion (float): lens distortion correction coefficient.
  • perspective_strength (float): 0.0 = no perspective warp, 1.0 = full warp toward vanishing point.
  • min_roi_height (int): Minimum pixel height of a generated ROI polygon.

Definition at line 210 of file find_distance.py.

◆ calib_window

find_distance.calib_window = CalibWindow()

Definition at line 1579 of file find_distance.py.

◆ CAM_ORDER

list find_distance.CAM_ORDER = ['Left', 'Right']

Ordered list of camera name strings.

Index matches SOURCES and shared memory lists.

Definition at line 162 of file find_distance.py.

◆ CAP_H

int find_distance.CAP_H = 480

Capture frame height in pixels.

Definition at line 117 of file find_distance.py.

◆ CAP_SHAPE

tuple find_distance.CAP_SHAPE = (CAP_H, CAP_W, 3)

NumPy shape tuple for a raw capture frame (H, W, 3).

Definition at line 126 of file find_distance.py.

◆ CAP_W

int find_distance.CAP_W = 640

capture frame width in pixels.

Definition at line 114 of file find_distance.py.

◆ CAPTURE_FPS

int find_distance.CAPTURE_FPS = 30

Target capture framerate for live cameras and recording output.

Definition at line 138 of file find_distance.py.

◆ CLASSES

list find_distance.CLASSES = [2, 3, 5, 7]

COCO class IDs to detect.

2=car, 3=motorcycle, 5=bus, 7=truck.

Definition at line 159 of file find_distance.py.

◆ CONF

float find_distance.CONF = 0.25

YOLO detection confidence threshold.

Detections below this are discarded

Definition at line 147 of file find_distance.py.

◆ daemon

find_distance.daemon

Definition at line 2009 of file find_distance.py.

◆ deadline

int find_distance.deadline = time.time() + 15

Definition at line 1982 of file find_distance.py.

◆ DISP_H

int find_distance.DISP_H = 480

Display frame height in pixels.

Definition at line 123 of file find_distance.py.

◆ DISP_SHAPE

tuple find_distance.DISP_SHAPE = (DISP_H, DISP_W, 3)

NumPy shape tuple for a display frame (H, W, 3).

Definition at line 129 of file find_distance.py.

◆ DISP_W

int find_distance.DISP_W = 640

Display frame width in pixels.

Definition at line 120 of file find_distance.py.

◆ DISPLAY_ORDER

list find_distance.DISPLAY_ORDER = [0, 1]

Order in which camera panels are stitched horizontally in the display window.

Definition at line 165 of file find_distance.py.

◆ EMPTY_COLOR

tuple find_distance.EMPTY_COLOR = (0, 220, 80)

BGR colour used to fill empty spot overlays (green tint).

Definition at line 230 of file find_distance.py.

◆ EMPTY_CONFIRM_FRAMES

int find_distance.EMPTY_CONFIRM_FRAMES = 3

Number of consecutive inference frames a spot must appear empty before its state is changed from 'Car' to 'Empty'.

Reduces false-empty flicker.

Definition at line 304 of file find_distance.py.

◆ enabled

find_distance.enabled

Definition at line 69 of file find_distance.py.

◆ frame_ready_event

find_distance.frame_ready_event = Event()

Definition at line 1919 of file find_distance.py.

◆ idx

find_distance.idx = int(re.search(r'\d+', line).group())

Definition at line 1936 of file find_distance.py.

◆ IMGSZ

int find_distance.IMGSZ = 128

Input image size (square) fed to the YOLO model in pixels.

Definition at line 144 of file find_distance.py.

◆ INF_IDX

list find_distance.INF_IDX = [0, 1]

Camera indices that are sent to the inference pipeline.

Definition at line 168 of file find_distance.py.

◆ inf_t

find_distance.inf_t
Initial value:
1= Thread(
2 target=inference_loop,
3 args=(need_annotation, frame_ready_event, STOP_EVENT),
4 daemon=True
5 )

Definition at line 2001 of file find_distance.py.

◆ INFERENCEFPS

int find_distance.INFERENCEFPS = 15

Maximum inference framerate.

Caps how often the YOLO model is called.

Definition at line 141 of file find_distance.py.

◆ INTERSECT_ALLOWANCE

float find_distance.INTERSECT_ALLOWANCE = 0.15

Minimum fraction of a spot's mask area that a bounding box must overlap to mark the spot as occupied.

(0.15 = 15%)

Definition at line 179 of file find_distance.py.

◆ JETSON_CLOCKS_CONF

str find_distance.JETSON_CLOCKS_CONF = '/tmp/jetson_clocks_backup.conf'

Temporary file path used to store the original jetson clock state before locking.

Definition at line 110 of file find_distance.py.

◆ line

find_distance.line = f.readline()

Definition at line 1929 of file find_distance.py.

◆ m

find_distance.m = np.zeros((DISP_H, DISP_W), dtype=np.uint8)

Definition at line 222 of file find_distance.py.

◆ MAX_BATCH

int find_distance.MAX_BATCH = 3

Maximum batch size passed to the YOLO model per inference call.

Note
Padding frames are added when fewer cameras are active.

Definition at line 156 of file find_distance.py.

◆ MIN_BOX_H

dict find_distance.MIN_BOX_H = {'Left': 30, 'Right': 30}

Per camera minimum bounding box height in pixels.

Boxes shorter than this are ignored to filter out distant vehicles.

Definition at line 175 of file find_distance.py.

◆ MODEL_PATH

str find_distance.MODEL_PATH = "./models/yolo11n.engine"

Path to the YOLO model file.

Supports .pt (PyTorch) and .engine (TensorRT).

Definition at line 150 of file find_distance.py.

◆ need_annotation

find_distance.need_annotation = args.test or args.record

Definition at line 1923 of file find_distance.py.

◆ OCCUPIED_COLOR

tuple find_distance.OCCUPIED_COLOR = (0, 0, 220)

BGR colour used to fill occupied spot overlays (red tint).

Definition at line 227 of file find_distance.py.

◆ ok

find_distance.ok

Definition at line 1947 of file find_distance.py.

◆ p

find_distance.p
Initial value:
1= Process(
2 target=capture_worker,
3 args=(i, src, RAW_SHM_OBJS[i].name, RAW_LOCKS[i], RAW_FRAME_ID[i], frame_ready_event, STOP_EVENT),
4 daemon=False,
5 )

Definition at line 1969 of file find_distance.py.

◆ port

find_distance.port = float(re.search(r'\d+\.\d+', line).group())

Definition at line 1934 of file find_distance.py.

◆ probe

find_distance.probe = cv2.VideoCapture(src)

Definition at line 1946 of file find_distance.py.

◆ PROCESSES

list find_distance.PROCESSES = []

List of capture worker Process objects, kept for clean shutdown.

Definition at line 271 of file find_distance.py.

◆ raw

find_distance.raw = SharedMemory(create=True, size=CAP_H * CAP_W * 3)

Definition at line 1953 of file find_distance.py.

◆ RAW_BUFS

list find_distance.RAW_BUFS = []

NumPy arrays mapped onto RAW_SHM_OBJS for zero copy frame access.

Definition at line 255 of file find_distance.py.

◆ RAW_FRAME_ID

list find_distance.RAW_FRAME_ID = []

Per camera shared Value('i') counters incremented each time a new frame is written.

Used by the inference thread to detect stale frames without locking.

Definition at line 268 of file find_distance.py.

◆ RAW_LOCKS

list find_distance.RAW_LOCKS = []

Per-camera multiprocessing Locks protecting RAW_BUFS entries.

Definition at line 264 of file find_distance.py.

◆ RAW_SHM_OBJS

list find_distance.RAW_SHM_OBJS = []

SharedMemory objects holding raw (unannotated) camera frames.

Definition at line 249 of file find_distance.py.

◆ rec_t

find_distance.rec_t = Thread(target=record_loop, args=(STOP_EVENT, args.annotate), daemon=True)

Definition at line 2014 of file find_distance.py.

◆ roi_editor

find_distance.roi_editor = ROIEditor()

Definition at line 1042 of file find_distance.py.

◆ ROIS

dict find_distance.ROIS
Initial value:
1= {
2 'Left': [
3 ],
4 'Right': [
5 ],
6}

Per camera list of parking spot definitions.

Note
Populate these manually or let the calibration system generate them at runtime.

Definition at line 186 of file find_distance.py.

◆ SOURCES

list find_distance.SOURCES = ['./recordings/left_side_cam.mp4', './recordings/right_side_ccam.mp4']

Video sources for each camera.

Can be V4L2 device indices (int) or file paths (str).

Note
Indices are auto-detected at startup by reading v4l2-ctl output. SOURCES = [0, 0] #Left, Right SOURCES = ['./Left1.mp4', './Right1.mp4']

Definition at line 135 of file find_distance.py.

◆ SPOT_ALPHA

float find_distance.SPOT_ALPHA = 0.25

Opacity of the spot overlay blend.

0.0 = invisible, 1.0 = fully opaque.

Definition at line 233 of file find_distance.py.

◆ SPOT_CAMS

dict find_distance.SPOT_CAMS = {'Left', 'Right'}

Set of camera names that have parking spot ROIs defined and should be checked for occupancy.

Definition at line 171 of file find_distance.py.

◆ SPOT_MASK

dict find_distance.SPOT_MASK = {}

Per camera list of boolean NumPy arrays (DISP_H x DISP_W) pre-rasterised from ROIS.

True pixels belong to that spot's polygon. Used for fast overlap checks.

Definition at line 195 of file find_distance.py.

◆ spot_states

dict find_distance.spot_states
Initial value:
1= {
2 cam: ['Empty'] * len(spots) for cam, spots in ROIS.items()
3}

Per-camera list of occupancy strings for each spot.

Values are 'Empty' or 'Car'. Written by check_parking_spots(), read by the Flask API and display loop.

Definition at line 243 of file find_distance.py.

◆ STOP_EVENT

find_distance.STOP_EVENT = Event()

Multiprocessing Event shared with all worker processes and threads.

Set to True to signal a clean shutdown across the entire system.

Definition at line 279 of file find_distance.py.

◆ store_lock

find_distance.store_lock = threading.Lock()

Threading lock protecting all shared state variables (spot_states, _last_boxes, etc.) from concurrent reads/writes by the inference and display threads.

Definition at line 238 of file find_distance.py.

◆ target

find_distance.target

Definition at line 2008 of file find_distance.py.

◆ usb_1

float find_distance.usb_1 = 2.1

Definition at line 1926 of file find_distance.py.

◆ usb_2

float find_distance.usb_2 = 2.2

Definition at line 1927 of file find_distance.py.