在本文中,将学习如何利用计算机视觉技术构建一个实时监控道路拥堵情况的系统。将从单个摄像头的图像处理开始,逐步扩展到多个摄像头的拥堵分析,并最终实现对纽约市道路摄像头数据流的监控。
对于项目,将使用在Roboflow Universe上已经可用的汽车检测模型。可以使用流处理技术来处理几乎所有的视觉模型,包括预训练的Universe模型、自己的Roboflow模型,或者在COCO数据集上训练的YOLO模型。
对于不同的用例或生产用途,可能需要训练一个新的模型或使用自己的数据进行微调。可以查看如何训练YOLOv8或YOLOv9的指南。一旦选择了一个预训练的模型或训练了自己的模型,就可以进入下一步。
在处理多个摄像头之前,让先从处理一个视频流开始。为此,将使用Inference包中的InferencePipeline功能。InferencePipeline在输入流(称为视频引用)和输出处理(称为“接收器”)的系统上工作。视频引用可以是本地摄像头设备、视频文件或链接或流的URL。在这个项目中,将使用纽约市交通部提供的实时网络摄像头流URL。
首先,将尝试使用默认的接收器render_boxes来运行InferencePipeline,以渲染并输出边界框。
from inference import InferencePipeline
from inference.core.interfaces.stream.sinks import render_boxes
pipeline = InferencePipeline.init(
model_id="vehicle-detection-3mmwj/1",
max_fps=0.5,
confidence=0.3,
video_reference="https://webcams.nyctmc.org/api/cameras/053e8995-f8cb-4d02-a659-70ac7c7da5db/image",
on_prediction=render_boxes,
api_key="*ROBOFLOW_API_KEY*"
)
pipeline.start()
pipeline.join()
启动InferencePipeline后,开始看到模型对实时帧的预测,可以看到时代广场附近拥挤的街道。
现在,让创建一个自定义输出接收器。对于这个项目,希望输出接收器做两件事:创建一个实时流的时间延迟视频以供以后查看,以及计算并记录任何时候检测到的车辆数量。
首先,将进行一些设置:为了记录车辆,将创建并添加到一个Google表格(使用gspread包)。为了录制视频,将使用OpenCV(cv2)包中的VideoWriter。由于实时流将无限期地进行,当想要停止处理时,需要一种方法来释放VideoWriter,以便它产生一个有效的视频文件。为此,将使用signal包来拦截键盘中断并关闭视频写入器。
from inference import InferencePipeline
from datetime import datetime
import pytz
import supervision as sv
# Google Colab
from google.colab import auth, userdata
api_key = userdata.get('ROBOFLOW_API_KEY')
# Google Sheet Setup
import gspread
from google.auth import default
auth.authenticate_user()
creds, _ = default()
googlesheets = gspread.authorize(creds)
document = googlesheets.open_by_key('1tNGjQSJQqQ7j9BoIw4VcxPn_DIcai8zxv_IwcSRlh34')
worksheet = document.worksheet('SingleCameraTest')
# VideoWriter Setup
import cv2
video_info = (352, 240, 60) # The size of the stream
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter("nyc_traffic_timelapse.mp4", fourcc, video_info[2], video_info[:2])
# Interrupt Handling
import signal
import sys
def signal_handler(sig, frame):
writer.release()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
然后,将定义回调函数,再次定义InferencePipeline,并用新自定义接收器替换on_prediction默认接收器。
from inference import InferencePipeline
from datetime import datetime
import pytz
def on_prediction(predictions, video_frame, writer):
# Process Results
detections = sv.Detections.from_inference(predictions)
annotated_frame = sv.BoundingBoxAnnotator(
thickness=1
).annotate(video_frame.image, detections)
# Add Frame To Timelapse
writer.write(annotated_frame)
# Format data for Google Sheets
ET = pytz.timezone('America/New_York')
time = datetime.now(ET).strftime("%H:%M")
fields = [time, len(detections)]
print(fields)
# Add to Google Sheet
worksheet.append_rows([fields], "USER_ENTERED")
pipeline = InferencePipeline.init(
model_id="vehicle-detection-3mmwj/1",
max_fps=0.5,
confidence=0.3,
video_reference="https://webcams.nyctmc.org/api/cameras/053e8995-f8cb-4d02-a659-70ac7c7da5db/image",
on_prediction=lambda predictions, video_frame: on_prediction(predictions, video_frame, writer),
api_key=api_key
)
pipeline.start()
pipeline.join()
随着流开始被处理,看到Google表格开始填充车辆计数。
InferencePipeline使得在摄像头流上运行计算机视觉模型变得简单,通过对代码进行一些修改,可以使其在几个不同的流上运行。
对于这个项目,将使用三个不同的街道摄像头来跟踪流URL和街道位置。还需要为每个摄像头创建单独的VideoWriter实例。
cameras = {
"5th Ave @ 34 St": "https://webcams.nyctmc.org/api/cameras/3a3d7bc0-7f35-46ba-9cca-75fe83aac34d/image",
"2 Ave @ 74 St": "https://webcams.nyctmc.org/api/cameras/6316453d-6161-4b98-a8e7-0e36c69d267c/image",
"E 14 St @ Irving Pl": "https://webcams.nyctmc.org/api/cameras/f9cb9d4c-10ad-42e4-8997-dbc9e12bd55a/image"
}
camera_writers = [
cv2.VideoWriter(f"{location}.mp4", fourcc, video_info[2], video_info[:2]) for location in cameras.keys()
]
然后,将修改接收器并创建一个摄像头处理函数。
from inference.core.interfaces.stream.inference_pipeline import SinkMode
def process_camera(predictions, video_frame, location):
# Process Results
detections = sv.Detections.from_inference(predictions)
annotated_frame = sv.BoundingBoxAnnotator(
thickness=1
).annotate(video_frame.image, detections)
vehicles = len(detections)
# Add to Google Sheet
ET = pytz.timezone('America/New_York')
time = datetime.now(ET).strftime("%H:%M")
worksheet = document.worksheet(location)
print(location,"has",vehicles,"cars")
fields = [time, vehicles]
worksheet.append_rows([fields], "USER_ENTERED")
return annotated_frame
def on_prediction(predictions, video_frame, camera_writers):
idx = video_frame.source_id
annotated_frame = process_camera(predictions,video_frame,list(cameras.keys())[idx])
camera_writers[idx].write(annotated_frame)
pipeline = InferencePipeline.init(
model_id="vehicle-detection-3mmwj/1",
max_fps=0.5,
confidence=0.3,
video_reference=list(cameras.values()),
on_prediction=lambda predictions, video_frame: on_prediction(predictions, video_frame, camera_writers),
api_key=api_key,
sink_mode=SinkMode.SEQUENTIAL # Sequential mode means each prediction will trigger one sink call
)
pipeline.start()
pipeline.join()
一旦启动了管道,表格将再次开始填充,一旦停止它,可以将生成的图表与时间延迟视频结合起来。