在树莓派上实现实时视频流中的人检测

在本系列的第七篇文章中,已经构建了一个用于人检测的深度神经网络(DNN)模型,并编写了Python代码,以便在树莓派设备上运行这个模型。使用MobileNet DNN构建的AI算法在Pi 3B板上的平均速度大约是1.25 FPS。正如在之前的文章中展示的,这个速度足以检测实时摄像头视频流中人的出现。但算法并不是传统意义上的实时算法。

视频监控系统中的实时视频摄像头通常以15-30 FPS的速度运行。算法,以1.25 FPS的速率处理帧,实在是太慢了。是什么让它变慢了呢?让从代码中寻找线索。

以下是在之前文章中开发的代码片段:

fps.start() obj_data = self.ssd.detect(frame) persons = self.ssd.get_objects(frame, obj_data, class_num, min_confidence) fps.stop()

如所见,只测量了人检测操作的速度(在帧上运行SSD模型)。因此,这是需要0.8秒每帧执行的操作。算法的其余部分足够快,可以以实时视频系统的速度运行。

考虑到这一点,能否重新设计算法,以适应处理实时视频流所需的更高速度呢?让尝试一下。

在当前版本的算法中,运行SSD模型处理,大约需要0.8秒每帧,其余的代码可以实时运行,顺序执行。因此,较慢的部分定义了整体速度。将算法适应实时模式的最简单方法是将其分成部分,并并行运行这些部分。“快速”部分将以实时速度运行,“慢速”部分将以1.25 FPS运行。但由于部分将异步运行,可以处理“慢速”部分(SSD模型)以及只有那些值得处理的帧。

以下是实现异步处理思想的新类的代码:

# 实时视频检测器 import sys from multiprocessing import Process from multiprocessing import Queue def detect_in_process(proto, model, ssd_proc, frame_queue, person_queue, class_num, min_confidence): ssd_net = CaffeModelLoader.load(proto, model) ssd = SSD(ssd_proc, ssd_net) while True: if not frame_queue.empty(): frame = frame_queue.get() obj_data = ssd.detect(frame) persons = ssd.get_objects(frame, obj_data, class_num, min_confidence) person_queue.put(persons) class RealtimeVideoSSD: def __init__(self, proto, model, ssd_proc): self.ssd_proc = ssd_proc self.proto = proto self.model = model def detect(self, video, class_num, min_confidence): detection_num = 0 capture = cv2.VideoCapture(video) frame_queue = Queue(maxsize=1) person_queue = Queue(maxsize=1) detect_proc = Process(target=detect_in_process, args=(self.proto, self.model, self.ssd_proc, frame_queue, person_queue, class_num, min_confidence)) detect_proc.daemon = True detect_proc.start() frame_num = 0 persons = None # 捕获所有帧 while True: t1 = time.time() (ret, frame) = capture.read() if frame is None: break if frame_queue.empty(): print("Put into frame queue ..." + str(frame_num)) frame_queue.put(frame) t2 = time.time() dt = t2-t1 if dt<0.040: st = 0.040-dt print("Sleep..." + str(st)) time.sleep(st) if not person_queue.empty(): persons = person_queue.get() print("Get from person queue ..." + str(len(persons))) if (persons is not None) and (len(persons)>0): detection_num += len(persons) Utils.draw_objects(persons, "PERSON", (0, 0, 255), frame) # 显示结果帧 cv2.imshow('Person detection', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break frame_num += 1 capture.release() cv2.destroyAllWindows() return (frame_num, detection_num)

如所见,检测算法被提取到一个单独的函数detect_in_process中。这个函数有两个特殊参数:frame_queue和person_queue。这些队列旨在支持帧捕获过程和人检测过程之间的交互。

让看一下实时人检测的主类RealtimeVideoSSD。它的构造函数接收三个参数:SSD结构的原型,SSD模型,以及帧处理器。注意前两个参数是模型数据的路径,而不是加载的模型本身。模型在执行过程中加载。这使能够避免在进程之间传输大量数据;只传输轻量级字符串数据。

类的detect方法接收视频文件的路径,人类别的数量,以及置信度阈值。首先,它为视频流创建捕获和队列,然后初始化Process类的实例,指定参数。其中一个是detect_in_process函数,它将与进程一起运行。检测进程在后台运行(daemon = True)。

接下来,循环处理所有接收到的帧。当得到一个新帧时,查看帧队列,如果它是空的,将帧放入队列。后台进程查看队列并开始处理帧;完成后,进程将结果放入人队列。

帧处理循环查看人队列,如果有检测,它将在当前帧上绘制。因此,如果在帧中检测到人,那么这个人将显示在检测结果的帧中,而不是原始帧中。

帧处理循环中的另一个技巧是,测量capture.read方法的时间。然后将帧处理转换为睡眠模式以强制它减速。在这种情况下,将速度降低到大约25 FPS(0.040秒 = 1/25 FPS)——测试视频的实际帧率。

现在可以使用以下Python代码以实时模式测试人检测:

# 测试实时模式下的SSD if __name__ == '__main__': proto_file = r"/home/pi/Desktop/PI_RPD/mobilenet.prototxt" model_file = r"/home/pi/Desktop/PI_RPD/mobilenet.caffemodel" proc_frame_size = 300 person_class = 15 # MobileNet的帧处理器 frame_proc = FrameProcessor(proc_frame_size, 1.0/127.5, 127.5) video_file = r"/home/pi/Desktop/PI_RPD/video/persons_4.mp4" video_ssd = RealtimeVideoSSD(proto_file, model_file, frame_proc) (frames, detections) = video_ssd.detect(video_file, person_class, 0.5) print("Frames count: " + str(frames)) print("Detection count: " + str(detections))

以下是在Pi 3B设备上测试运行的屏幕视频:

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485