source/api/live-processing.py
source-code api frame-processing async
File Path: src/api/live_processing.py
Purpose: Frame buffer management and asynchronous keypoint extraction for real-time processing.
Overview
Provides infrastructure for buffering incoming video frames and extracting keypoints asynchronously using thread pools.
Classes
FrameBuffer
Purpose: Circular buffer for managing video frames during inference.
Attributes:
_frames(dict[int, np.ndarray]): Frame storage indexed by frame number_max_size(int): Maximum buffer capacity_latest_idx(int): Index of most recent framelogger: Logger instance
Methods:
__init__(max_size)
Initializes buffer with maximum size.
add_frame(frame)
Adds frame to buffer, removes oldest if full.
Called By: ws_live_signs()
get_frame(idx) -> Optional[np.ndarray]
Retrieves frame by index.
Called By: ws_live_signs()
latest_idx (property)
Returns index of most recent frame.
oldest_idx (property)
Returns index of oldest frame in buffer.
clear()
Clears all frames and resets index.
Called By: ws_live_signs()
Functions
producer_handler(websocket, buffer: FrameBuffer)
Purpose: Asynchronously receives and decodes frames from WebSocket.
Parameters:
websocket: WebSocket connectionbuffer(FrameBuffer): Frame buffer to populate
Implementation:
- Receives binary frame data via WebSocket
- Decodes JPEG frames using cv2.imdecode
- Adds decoded frames to buffer
- Runs until WebSocket closes or error occurs
Called By: ws_live_signs()
Calls:
websocket.receive_bytes()- Receives frame datacv2.imdecode()- Decodes JPEG to numpy array- buffer.add_frame() - Stores frame
get_frame_kps(mp_processor, frame, timestamp_ms=-1)
function async keypoint-extraction
Purpose: Asynchronously extracts keypoints from frame using MediaPipe.
Parameters:
mp_processor(LandmarkerProcessor): MediaPipe processorframe(np.ndarray): Video frametimestamp_ms(int): Frame timestamp
Returns: Extracted keypoints array
Implementation: Uses thread pool executor to run MediaPipe processing off main thread.
Called By: ws_live_signs()
Calls:
Thread Pool
keypoints_detection_executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)Configuration: MAX_WORKERS = 4
Used By: get_frame_kps()
Related Documentation
- websocket.py - Main consumer
- mediapipe_utils.py - Keypoint extraction
- Live Processing Pipeline
File Location: src/api/live_processing.py