Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,10 @@ cython_debug/
**/*.run
.vscode/
setup/zed_sdk.run
cuda/
cuda/
homework/yolov8n.pt
homework/yolo11n.pt
GEMstack/knowledge/detection/yolov8n.pt
GEMstack/knowledge/detection/yolo11n.pt
yolov8n.pt
yolo11n.pt
Binary file removed GEMstack/knowledge/detection/yolo11n.pt
Binary file not shown.
248 changes: 157 additions & 91 deletions GEMstack/onboard/perception/pedestrian_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,112 +2,178 @@
from ..interface.gem import GEMInterface
from ..component import Component
from ultralytics import YOLO
import os
import cv2
import rospy
from cv_bridge import CvBridge, CvBridgeError
from sensor_msgs.msg import Image
from typing import Dict
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
import rospy
import os


def box_to_fake_agent(box):
"""Creates a fake agent state from an (x,y,w,h) bounding box.
"""Creates a fake agent state from an (x,y,w,h) bounding box.

The location and size are pretty much meaningless since this is just giving a 2D location.
The location and size are pretty much meaningless since this is just giving a 2D location.
"""
x,y,w,h = box
pose = ObjectPose(t=0,x=x+w/2,y=y+h/2,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT)
dims = (w,h,0)
return AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0)
x,y,w,h = box
pose = ObjectPose(t=0,x=x+w/2,y=y+h/2,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT)
dims = (w,h,0)
return AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0)


class PedestrianDetector2D(Component):
"""Detects pedestrians."""
def __init__(self,vehicle_interface : GEMInterface):
self.vehicle_interface = vehicle_interface
self.detector = YOLO(f"{os.getcwd()}/knowledge/detection/yolo11n.pt")
self.last_person_boxes = []

def rate(self):
return 4.0
"""Detects pedestrians.

def state_inputs(self):
return ['vehicle']
self.vehicle -> GEM car info
self.detector -> pedestrian detection model
self.last_person_boxes -> used to store boxes detected in every frame of the video
self.pedestrians -> stores the agentstate (pedestrian) found during the video in a dict, not sure if required can be removed
self.visualization -> enable this to view pedestrian detected
self.confidence -> min model confidence level
self.classes_to_detect -> Classes for the model to detect

def state_outputs(self):
return ['agents']
"""
def __init__(self,vehicle_interface : GEMInterface):
self.vehicle_interface = vehicle_interface
self.detector = YOLO(os.getcwd()+'/GEMstack/knowledge/detection/yolov8n.pt') # change to get model value from sys arg
self.last_person_boxes = []
self.pedestrians = {}
self.visualization = True # Set this to true for visualization, later change to get value from sys arg
self.confidence = 0.7
self.classes_to_detect = 0

def rate(self):
return 4.0

def initialize(self):
# tell the vehicle to use image_callback whenever 'front_camera' gets a reading, and it expects images of type cv2.Mat
def state_inputs(self):
return ['vehicle']

def state_outputs(self):
return ['agents']

def initialize(self):

"""Initializes subscribers and publishers

self.vehicle_interface.subscribe_sensor -> GEM Car camera subscription
self.rosbag_test -> ROS Bag topic subscription
self.pub_image -> Publishes Image with detection for visualization

"""
#tell the vehicle to use image_callback whenever 'front_camera' gets a reading, and it expects images of type cv2.Mat

# GEM Car subacriber
# self.vehicle_interface.subscribe_sensor('front_camera',self.image_callback,cv2.Mat)

# Initialize cv bridge
self.bridge = CvBridge()

# Subscribe to webcam
# rospy.Subscriber('/webcam', Image, self.image_callback)

# Subscribe to GEM4 front camera
rospy.Subscriber('/oak/rgb/image_raw', Image, self.image_callback)

# Publish visualization
self.vis = rospy.Publisher("pedestrain_detection/annotate", Image, queue_size=1)

# Publish AgentState data TODO: Need to implement a custom data type for AgentState msg
# self.agent_state = rospy.Publisher("pedestrain_detection/agent_state", AgentState, queue_size=1)

def update(self, vehicle: VehicleState=None) -> Dict[str,AgentState]:
res = {}
for i,b in enumerate(self.last_person_boxes):
x,y,w,h = b
res['pedestrian'+str(i)] = box_to_fake_agent(b)
if len(res) > 0:
print("Detected",len(res),"pedestrians")
return res
# Webcam
# rospy.Subscriber('/webcam', Image, self.image_callback)

# Testing with rosbag
self.rosbag_test = rospy.Subscriber('/oak/rgb/image_raw',Image, self.image_callback,queue_size=1)
if(self.visualization):
self.pub_image = rospy.Publisher("/camera/image_detection", Image, queue_size=1)
pass

# Use cv2.Mat for GEM Car, Image for RosBag
def image_callback(self, image : Image): #image : cv2.Mat):

"""Detects pedestrians using the model provided when new image is passed.

Converts Image.msg to cv2 format (Might need to change this to use cv2.Mat) and uses the model to detect pedestrian
IF visualization is true, will publish an image with pedestrians detected.

Hardcoded values for now:
Detected only pedestrians -> Class = 0
Confidence level -> 0.7

"""

# Use Image directly for GEM Car
# track_result = self.detector.track(source=image, classes=self.classes_to_detect, persist=True, conf=self.confidence)

# Convert to CV2 format for RosBag
bridge = CvBridge()
image = bridge.imgmsg_to_cv2(image, "bgr8")
track_result = self.detector.track(source=image, classes=self.classes_to_detect, persist=True, conf=self.confidence)

self.last_person_boxes = []
boxes = track_result[0].boxes

# Used for visualization
if(self.visualization):
label_text = "Pedestrian "
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
font_color = (255, 255, 255) # White text
outline_color = (0, 0, 0) # Black outline
line_type = 1
text_thickness = 2 # Text thickness
outline_thickness = 1 # Thickness of the text outline

# Unpacking box dimentions detected into x,y,w,h
for box in boxes:

xywh = box.xywh[0].tolist()
self.last_person_boxes.append(xywh)
x, y, w, h = xywh
id = box.id.item()

# Stores AgentState in a dict, can be removed if not required
pose = ObjectPose(t=0,x=x,y=y,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT)
dims = (w,h,0)
if(id not in self.pedestrians.keys()):
self.pedestrians[id] = AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0)
else:
self.pedestrians[id].pose = pose
self.pedestrians[id].dims = dims

# Used for visualization
if(self.visualization):
# Draw bounding box
cv2.rectangle(image, (int(x - w / 2), int(y - h / 2)), (int(x + w / 2), int(y + h / 2)), (255, 0, 255), 3)

# Define text label
x = int(x - w / 2)
y = int(y - h / 2)
label = label_text + str(id) + " : " + str(round(box.conf.item(), 2))

# Get text size
text_size, baseline = cv2.getTextSize(label, font, font_scale, line_type)
text_w, text_h = text_size

# Position text above the bounding box
text_x = x
text_y = y - 10 if y - 10 > 10 else y + h + text_h

# Draw text outline for better visibility, uncomment for outline
# for dx, dy in [(-1, -1), (-1, 1), (1, -1), (1, 1)]:
# cv2.putText(image, label, (text_x + dx, text_y - baseline + dy), font, font_scale, outline_color, outline_thickness)

# Draw main text on top of the outline
cv2.putText(image, label, (text_x, text_y - baseline), font, font_scale, font_color, text_thickness)


# Used for visualization
if(self.visualization):
ros_img = bridge.cv2_to_imgmsg(image, 'bgr8')
self.pub_image.publish(ros_img)

#uncomment if you want to debug the detector...
# print(self.last_person_boxes)
# print(self.pedestrians.keys())
#for bb in self.last_person_boxes:
# x,y,w,h = bb
# cv2.rectangle(image, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), (255, 0, 255), 3)
#cv2.imwrite("pedestrian_detections.png",image)

def image_callback(self, image: Image):
# Convert image to cv2 format
try:
cv_image = self.bridge.imgmsg_to_cv2(image, "bgr8")
except CvBridgeError as e:
print(e)

detection_result = self.detector(cv_image)
self.last_person_boxes = []

# Extract bounding boxes (x,y,w,h)
data = detection_result[0].boxes
cls = data.cls.tolist()
conf = data.conf.tolist()
bboxes = data.xywh.tolist()

for k in range(len(cls)):
category = cls[k]
confidence = conf[k]

b = bboxes[k]
bb = (b[0], b[1], b[2], b[3])
if category == 0 and confidence > 0.5:
self.last_person_boxes.append(bb)

# Insert bounding boxes into AgentState object
agent_state = self.update()
print(f"agent_state: {agent_state}")

# Publish AgentState object TODO: Need to implement a custom data type for AgentState msg
# self.agent_state.publish(agent_state)

# Add annotated visualization
font = cv2.FONT_HERSHEY_SIMPLEX
for id, bb in enumerate(self.last_person_boxes):
x,y,w,h = bb
cv2.rectangle(cv_image, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), (255, 0, 255), 3)
cv2.putText(cv_image, f'Person: {id + 1}', (int(x-w/2), int(y-h/2) - 15), font, 0.7, (255,0,255), 2, cv2.LINE_AA)
cv2.putText(cv_image, f'Detected {len(agent_state)} pedestrians', (50,40), font, 0.8, (255,0,0), 2, cv2.LINE_AA)

# Publish visualization topic
if cv_image is not None:
out_vis_msg = self.bridge.cv2_to_imgmsg(cv_image, 'bgr8')
self.vis.publish(out_vis_msg)
def update(self, vehicle : VehicleState) -> Dict[str,AgentState]:
res = {}
for i,b in enumerate(self.last_person_boxes):
x,y,w,h = b
res['pedestrian'+str(i)] = box_to_fake_agent(b)
if len(res) > 0:
print("Detected",len(res),"pedestrians")
return res


class FakePedestrianDetector2D(Component):
Expand Down
Binary file removed knowledge/detection/yolo11n.pt
Binary file not shown.
Binary file removed yolo11n.pt
Binary file not shown.