实时交通拥堵系统构建指南

在本文中,将学习如何利用计算机视觉技术构建一个实时监控道路拥堵情况的系统。将从单个摄像头的图像处理开始,逐步扩展到多个摄像头的拥堵分析,并最终实现对纽约市道路摄像头数据流的监控。

创建或使用计算机视觉模型

对于项目,将使用在Roboflow Universe上已经可用的汽车检测模型。可以使用流处理技术来处理几乎所有的视觉模型,包括预训练的Universe模型、自己的Roboflow模型,或者在COCO数据集上训练的YOLO模型。

对于不同的用例或生产用途,可能需要训练一个新的模型或使用自己的数据进行微调。可以查看如何训练YOLOv8或YOLOv9的指南。一旦选择了一个预训练的模型或训练了自己的模型,就可以进入下一步。

单摄像头流处理

在处理多个摄像头之前,让先从处理一个视频流开始。为此,将使用Inference包中的InferencePipeline功能。InferencePipeline在输入流(称为视频引用)和输出处理(称为“接收器”)的系统上工作。视频引用可以是本地摄像头设备、视频文件或链接或流的URL。在这个项目中,将使用纽约市交通部提供的实时网络摄像头流URL。

首先,将尝试使用默认的接收器render_boxes来运行InferencePipeline,以渲染并输出边界框。

from inference import InferencePipeline from inference.core.interfaces.stream.sinks import render_boxes pipeline = InferencePipeline.init( model_id="vehicle-detection-3mmwj/1", max_fps=0.5, confidence=0.3, video_reference="https://webcams.nyctmc.org/api/cameras/053e8995-f8cb-4d02-a659-70ac7c7da5db/image", on_prediction=render_boxes, api_key="*ROBOFLOW_API_KEY*" ) pipeline.start() pipeline.join()

启动InferencePipeline后,开始看到模型对实时帧的预测,可以看到时代广场附近拥挤的街道。

自定义输出接收器

现在,让创建一个自定义输出接收器。对于这个项目,希望输出接收器做两件事:创建一个实时流的时间延迟视频以供以后查看,以及计算并记录任何时候检测到的车辆数量。

首先,将进行一些设置:为了记录车辆,将创建并添加到一个Google表格(使用gspread包)。为了录制视频,将使用OpenCV(cv2)包中的VideoWriter。由于实时流将无限期地进行,当想要停止处理时,需要一种方法来释放VideoWriter,以便它产生一个有效的视频文件。为此,将使用signal包来拦截键盘中断并关闭视频写入器。

from inference import InferencePipeline from datetime import datetime import pytz import supervision as sv # Google Colab from google.colab import auth, userdata api_key = userdata.get('ROBOFLOW_API_KEY') # Google Sheet Setup import gspread from google.auth import default auth.authenticate_user() creds, _ = default() googlesheets = gspread.authorize(creds) document = googlesheets.open_by_key('1tNGjQSJQqQ7j9BoIw4VcxPn_DIcai8zxv_IwcSRlh34') worksheet = document.worksheet('SingleCameraTest') # VideoWriter Setup import cv2 video_info = (352, 240, 60) # The size of the stream fourcc = cv2.VideoWriter_fourcc(*'mp4v') writer = cv2.VideoWriter("nyc_traffic_timelapse.mp4", fourcc, video_info[2], video_info[:2]) # Interrupt Handling import signal import sys def signal_handler(sig, frame): writer.release() sys.exit(0) signal.signal(signal.SIGINT, signal_handler)

然后,将定义回调函数,再次定义InferencePipeline,并用新自定义接收器替换on_prediction默认接收器。

from inference import InferencePipeline from datetime import datetime import pytz def on_prediction(predictions, video_frame, writer): # Process Results detections = sv.Detections.from_inference(predictions) annotated_frame = sv.BoundingBoxAnnotator( thickness=1 ).annotate(video_frame.image, detections) # Add Frame To Timelapse writer.write(annotated_frame) # Format data for Google Sheets ET = pytz.timezone('America/New_York') time = datetime.now(ET).strftime("%H:%M") fields = [time, len(detections)] print(fields) # Add to Google Sheet worksheet.append_rows([fields], "USER_ENTERED") pipeline = InferencePipeline.init( model_id="vehicle-detection-3mmwj/1", max_fps=0.5, confidence=0.3, video_reference="https://webcams.nyctmc.org/api/cameras/053e8995-f8cb-4d02-a659-70ac7c7da5db/image", on_prediction=lambda predictions, video_frame: on_prediction(predictions, video_frame, writer), api_key=api_key ) pipeline.start() pipeline.join()

随着流开始被处理,看到Google表格开始填充车辆计数。

多摄像头流处理

InferencePipeline使得在摄像头流上运行计算机视觉模型变得简单,通过对代码进行一些修改,可以使其在几个不同的流上运行。

对于这个项目,将使用三个不同的街道摄像头来跟踪流URL和街道位置。还需要为每个摄像头创建单独的VideoWriter实例。

cameras = { "5th Ave @ 34 St": "https://webcams.nyctmc.org/api/cameras/3a3d7bc0-7f35-46ba-9cca-75fe83aac34d/image", "2 Ave @ 74 St": "https://webcams.nyctmc.org/api/cameras/6316453d-6161-4b98-a8e7-0e36c69d267c/image", "E 14 St @ Irving Pl": "https://webcams.nyctmc.org/api/cameras/f9cb9d4c-10ad-42e4-8997-dbc9e12bd55a/image" } camera_writers = [ cv2.VideoWriter(f"{location}.mp4", fourcc, video_info[2], video_info[:2]) for location in cameras.keys() ]

然后,将修改接收器并创建一个摄像头处理函数。

from inference.core.interfaces.stream.inference_pipeline import SinkMode def process_camera(predictions, video_frame, location): # Process Results detections = sv.Detections.from_inference(predictions) annotated_frame = sv.BoundingBoxAnnotator( thickness=1 ).annotate(video_frame.image, detections) vehicles = len(detections) # Add to Google Sheet ET = pytz.timezone('America/New_York') time = datetime.now(ET).strftime("%H:%M") worksheet = document.worksheet(location) print(location,"has",vehicles,"cars") fields = [time, vehicles] worksheet.append_rows([fields], "USER_ENTERED") return annotated_frame def on_prediction(predictions, video_frame, camera_writers): idx = video_frame.source_id annotated_frame = process_camera(predictions,video_frame,list(cameras.keys())[idx]) camera_writers[idx].write(annotated_frame) pipeline = InferencePipeline.init( model_id="vehicle-detection-3mmwj/1", max_fps=0.5, confidence=0.3, video_reference=list(cameras.values()), on_prediction=lambda predictions, video_frame: on_prediction(predictions, video_frame, camera_writers), api_key=api_key, sink_mode=SinkMode.SEQUENTIAL # Sequential mode means each prediction will trigger one sink call ) pipeline.start() pipeline.join()

一旦启动了管道,表格将再次开始填充,一旦停止它,可以将生成的图表与时间延迟视频结合起来。

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485