139 lines
4.4 KiB
Python
139 lines
4.4 KiB
Python
|
|
import cv2
|
|
import threading
|
|
import time
|
|
from typing import Optional, Callable
|
|
|
|
class VideoFileSource:
|
|
"""
|
|
A class to handle video file input using cv2.VideoCapture.
|
|
It reads frames from a video file and can send them to a pipeline.
|
|
"""
|
|
def __init__(self,
|
|
file_path: str,
|
|
data_callback: Optional[Callable[[object], None]] = None,
|
|
frame_callback: Optional[Callable[[object], None]] = None,
|
|
loop: bool = False):
|
|
"""
|
|
Initializes the VideoFileSource.
|
|
|
|
Args:
|
|
file_path (str): The path to the video file.
|
|
data_callback (Optional[Callable[[object], None]]): A callback function to send data to the pipeline.
|
|
frame_callback (Optional[Callable[[object], None]]): A callback function for raw frame updates.
|
|
loop (bool): Whether to loop the video when it ends.
|
|
"""
|
|
self.file_path = file_path
|
|
self.data_callback = data_callback
|
|
self.frame_callback = frame_callback
|
|
self.loop = loop
|
|
|
|
self.cap = None
|
|
self.running = False
|
|
self.thread = None
|
|
self._stop_event = threading.Event()
|
|
self.fps = 0
|
|
|
|
def initialize(self) -> bool:
|
|
"""
|
|
Initializes the video capture from the file.
|
|
|
|
Returns:
|
|
bool: True if initialization is successful, False otherwise.
|
|
"""
|
|
print(f"Initializing video source from {self.file_path}...")
|
|
self.cap = cv2.VideoCapture(self.file_path)
|
|
if not self.cap.isOpened():
|
|
print(f"Error: Could not open video file {self.file_path}.")
|
|
return False
|
|
|
|
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
|
|
if self.fps == 0:
|
|
print("Warning: Could not determine video FPS. Defaulting to 30.")
|
|
self.fps = 30
|
|
|
|
print(f"Video source initialized successfully. FPS: {self.fps}")
|
|
return True
|
|
|
|
def start(self):
|
|
"""
|
|
Starts the frame reading thread.
|
|
"""
|
|
if self.running:
|
|
print("Video source is already running.")
|
|
return
|
|
|
|
if not self.cap or not self.cap.isOpened():
|
|
if not self.initialize():
|
|
return
|
|
|
|
self.running = True
|
|
self._stop_event.clear()
|
|
self.thread = threading.Thread(target=self._capture_loop, daemon=True)
|
|
self.thread.start()
|
|
print("Video capture thread started.")
|
|
|
|
def stop(self):
|
|
"""
|
|
Stops the frame reading thread.
|
|
"""
|
|
self.running = False
|
|
if self.thread and self.thread.is_alive():
|
|
self._stop_event.set()
|
|
self.thread.join(timeout=2)
|
|
|
|
if self.cap and self.cap.isOpened():
|
|
self.cap.release()
|
|
self.cap = None
|
|
print("Video source stopped.")
|
|
|
|
def _capture_loop(self):
|
|
"""
|
|
The main loop for reading frames from the video file.
|
|
"""
|
|
while self.running and not self._stop_event.is_set():
|
|
ret, frame = self.cap.read()
|
|
if not ret:
|
|
if self.loop:
|
|
print("Video ended, looping...")
|
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
|
|
continue
|
|
else:
|
|
print("Video ended.")
|
|
self.running = False
|
|
break
|
|
|
|
if self.data_callback:
|
|
try:
|
|
self.data_callback(frame)
|
|
except Exception as e:
|
|
print(f"Error in data_callback: {e}")
|
|
|
|
if self.frame_callback:
|
|
try:
|
|
self.frame_callback(frame)
|
|
except Exception as e:
|
|
print(f"Error in frame_callback: {e}")
|
|
|
|
# Control frame rate
|
|
time.sleep(1.0 / self.fps)
|
|
|
|
def set_data_callback(self, callback: Callable[[object], None]):
|
|
"""
|
|
Sets the data callback function.
|
|
"""
|
|
self.data_callback = callback
|
|
|
|
def get_frame(self) -> Optional[object]:
|
|
"""
|
|
Gets a single frame from the video. Not recommended for continuous capture.
|
|
"""
|
|
if not self.cap or not self.cap.isOpened():
|
|
if not self.initialize():
|
|
return None
|
|
|
|
ret, frame = self.cap.read()
|
|
if not ret:
|
|
return None
|
|
return frame
|