diff --git a/.gitignore b/.gitignore index 5d34da6bb..61a2df38b 100644 --- a/.gitignore +++ b/.gitignore @@ -166,6 +166,14 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +# ZED run files +**/*.run .vscode/ setup/zed_sdk.run -cuda/ \ No newline at end of file +cuda/ +homework/yolov8n.pt +homework/yolo11n.pt +GEMstack/knowledge/detection/yolov8n.pt +GEMstack/knowledge/detection/yolo11n.pt +yolov8n.pt +yolo11n.pt \ No newline at end of file diff --git a/GEMstack/onboard/perception/pedestrian_detection.py b/GEMstack/onboard/perception/pedestrian_detection.py new file mode 100644 index 000000000..f91145274 --- /dev/null +++ b/GEMstack/onboard/perception/pedestrian_detection.py @@ -0,0 +1,204 @@ +from ...state import AllState,VehicleState,ObjectPose,ObjectFrameEnum,AgentState,AgentEnum,AgentActivityEnum +from ..interface.gem import GEMInterface +from ..component import Component +from ultralytics import YOLO +import cv2 +from typing import Dict +from cv_bridge import CvBridge +from sensor_msgs.msg import Image +import rospy +import os + + +def box_to_fake_agent(box): + """Creates a fake agent state from an (x,y,w,h) bounding box. + + The location and size are pretty much meaningless since this is just giving a 2D location. + """ + x,y,w,h = box + pose = ObjectPose(t=0,x=x+w/2,y=y+h/2,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT) + dims = (w,h,0) + return AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0) + + +class PedestrianDetector2D(Component): + """Detects pedestrians. + + self.vehicle -> GEM car info + self.detector -> pedestrian detection model + self.last_person_boxes -> used to store boxes detected in every frame of the video + self.pedestrians -> stores the agentstate (pedestrian) found during the video in a dict, not sure if required can be removed + self.visualization -> enable this to view pedestrian detected + self.confidence -> min model confidence level + self.classes_to_detect -> Classes for the model to detect + + """ + def __init__(self,vehicle_interface : GEMInterface): + self.vehicle_interface = vehicle_interface + self.detector = YOLO(os.getcwd()+'/GEMstack/knowledge/detection/yolov8n.pt') # change to get model value from sys arg + self.last_person_boxes = [] + self.pedestrians = {} + self.visualization = True # Set this to true for visualization, later change to get value from sys arg + self.confidence = 0.7 + self.classes_to_detect = 0 + + def rate(self): + return 4.0 + + def state_inputs(self): + return ['vehicle'] + + def state_outputs(self): + return ['agents'] + + def initialize(self): + + """Initializes subscribers and publishers + + self.vehicle_interface.subscribe_sensor -> GEM Car camera subscription + self.rosbag_test -> ROS Bag topic subscription + self.pub_image -> Publishes Image with detection for visualization + + """ + #tell the vehicle to use image_callback whenever 'front_camera' gets a reading, and it expects images of type cv2.Mat + + # GEM Car subacriber + # self.vehicle_interface.subscribe_sensor('front_camera',self.image_callback,cv2.Mat) + + # Webcam + # rospy.Subscriber('/webcam', Image, self.image_callback) + + # Testing with rosbag + self.rosbag_test = rospy.Subscriber('/oak/rgb/image_raw',Image, self.image_callback,queue_size=1) + if(self.visualization): + self.pub_image = rospy.Publisher("/camera/image_detection", Image, queue_size=1) + pass + + # Use cv2.Mat for GEM Car, Image for RosBag + def image_callback(self, image : Image): #image : cv2.Mat): + + """Detects pedestrians using the model provided when new image is passed. + + Converts Image.msg to cv2 format (Might need to change this to use cv2.Mat) and uses the model to detect pedestrian + IF visualization is true, will publish an image with pedestrians detected. + + Hardcoded values for now: + Detected only pedestrians -> Class = 0 + Confidence level -> 0.7 + + """ + + # Use Image directly for GEM Car + # track_result = self.detector.track(source=image, classes=self.classes_to_detect, persist=True, conf=self.confidence) + + # Convert to CV2 format for RosBag + bridge = CvBridge() + image = bridge.imgmsg_to_cv2(image, "bgr8") + track_result = self.detector.track(source=image, classes=self.classes_to_detect, persist=True, conf=self.confidence) + + self.last_person_boxes = [] + boxes = track_result[0].boxes + + # Used for visualization + if(self.visualization): + label_text = "Pedestrian " + font = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 0.5 + font_color = (255, 255, 255) # White text + outline_color = (0, 0, 0) # Black outline + line_type = 1 + text_thickness = 2 # Text thickness + outline_thickness = 1 # Thickness of the text outline + + # Unpacking box dimentions detected into x,y,w,h + for box in boxes: + + xywh = box.xywh[0].tolist() + self.last_person_boxes.append(xywh) + x, y, w, h = xywh + id = box.id.item() + + # Stores AgentState in a dict, can be removed if not required + pose = ObjectPose(t=0,x=x,y=y,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT) + dims = (w,h,0) + if(id not in self.pedestrians.keys()): + self.pedestrians[id] = AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0) + else: + self.pedestrians[id].pose = pose + self.pedestrians[id].dims = dims + + # Used for visualization + if(self.visualization): + # Draw bounding box + cv2.rectangle(image, (int(x - w / 2), int(y - h / 2)), (int(x + w / 2), int(y + h / 2)), (255, 0, 255), 3) + + # Define text label + x = int(x - w / 2) + y = int(y - h / 2) + label = label_text + str(id) + " : " + str(round(box.conf.item(), 2)) + + # Get text size + text_size, baseline = cv2.getTextSize(label, font, font_scale, line_type) + text_w, text_h = text_size + + # Position text above the bounding box + text_x = x + text_y = y - 10 if y - 10 > 10 else y + h + text_h + + # Draw text outline for better visibility, uncomment for outline + # for dx, dy in [(-1, -1), (-1, 1), (1, -1), (1, 1)]: + # cv2.putText(image, label, (text_x + dx, text_y - baseline + dy), font, font_scale, outline_color, outline_thickness) + + # Draw main text on top of the outline + cv2.putText(image, label, (text_x, text_y - baseline), font, font_scale, font_color, text_thickness) + + + # Used for visualization + if(self.visualization): + ros_img = bridge.cv2_to_imgmsg(image, 'bgr8') + self.pub_image.publish(ros_img) + + #uncomment if you want to debug the detector... + # print(self.last_person_boxes) + # print(self.pedestrians.keys()) + #for bb in self.last_person_boxes: + # x,y,w,h = bb + # cv2.rectangle(image, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), (255, 0, 255), 3) + #cv2.imwrite("pedestrian_detections.png",image) + + def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: + res = {} + for i,b in enumerate(self.last_person_boxes): + x,y,w,h = b + res['pedestrian'+str(i)] = box_to_fake_agent(b) + if len(res) > 0: + print("Detected",len(res),"pedestrians") + return res + + +class FakePedestrianDetector2D(Component): + """Triggers a pedestrian detection at some random time ranges""" + def __init__(self,vehicle_interface : GEMInterface): + self.vehicle_interface = vehicle_interface + self.times = [(5.0,20.0),(30.0,35.0)] + self.t_start = None + + def rate(self): + return 4.0 + + def state_inputs(self): + return ['vehicle'] + + def state_outputs(self): + return ['agents'] + + def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: + if self.t_start is None: + self.t_start = self.vehicle_interface.time() + t = self.vehicle_interface.time() - self.t_start + res = {} + for times in self.times: + if t >= times[0] and t <= times[1]: + res['pedestrian0'] = box_to_fake_agent((0,0,0,0)) + print("Detected a pedestrian") + return res diff --git a/homework/longitudinal_planning.py b/GEMstack/onboard/planning/longitudinal_planning.py similarity index 100% rename from homework/longitudinal_planning.py rename to GEMstack/onboard/planning/longitudinal_planning.py diff --git a/homework/pedestrian_yield_logic.py b/GEMstack/onboard/planning/pedestrian_yield_logic.py similarity index 100% rename from homework/pedestrian_yield_logic.py rename to GEMstack/onboard/planning/pedestrian_yield_logic.py diff --git a/homework/pedestrian_detection.py b/homework/pedestrian_detection.py deleted file mode 100644 index 3134b1e35..000000000 --- a/homework/pedestrian_detection.py +++ /dev/null @@ -1,84 +0,0 @@ -from ...state import AllState,VehicleState,ObjectPose,ObjectFrameEnum,AgentState,AgentEnum,AgentActivityEnum -from ..interface.gem import GEMInterface -from ..component import Component -#from ultralytics import YOLO -#import cv2 -from typing import Dict - -def box_to_fake_agent(box): - """Creates a fake agent state from an (x,y,w,h) bounding box. - - The location and size are pretty much meaningless since this is just giving a 2D location. - """ - x,y,w,h = box - pose = ObjectPose(t=0,x=x+w/2,y=y+h/2,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT) - dims = (w,h,0) - return AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0) - - -class PedestrianDetector2D(Component): - """Detects pedestrians.""" - def __init__(self,vehicle_interface : GEMInterface): - self.vehicle_interface = vehicle_interface - #self.detector = YOLO('../../knowledge/detection/yolov8n.pt') - self.last_person_boxes = [] - - def rate(self): - return 4.0 - - def state_inputs(self): - return ['vehicle'] - - def state_outputs(self): - return ['agents'] - - def initialize(self): - #tell the vehicle to use image_callback whenever 'front_camera' gets a reading, and it expects images of type cv2.Mat - #self.vehicle_interface.subscribe_sensor('front_camera',self.image_callback,cv2.Mat) - pass - - #def image_callback(self, image : cv2.Mat): - # detection_result = self.detector(image) - # self.last_person_boxes = [] - # #uncomment if you want to debug the detector... - # #for bb in self.last_person_boxes: - # # x,y,w,h = bb - # # cv2.rectangle(image, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), (255, 0, 255), 3) - # #cv2.imwrite("pedestrian_detections.png",image) - - def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: - res = {} - for i,b in enumerate(self.last_person_boxes): - x,y,w,h = b - res['pedestrian'+str(i)] = box_to_fake_agent(b) - if len(res) > 0: - print("Detected",len(res),"pedestrians") - return res - - -class FakePedestrianDetector2D(Component): - """Triggers a pedestrian detection at some random time ranges""" - def __init__(self,vehicle_interface : GEMInterface): - self.vehicle_interface = vehicle_interface - self.times = [(5.0,20.0),(30.0,35.0)] - self.t_start = None - - def rate(self): - return 4.0 - - def state_inputs(self): - return ['vehicle'] - - def state_outputs(self): - return ['agents'] - - def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: - if self.t_start is None: - self.t_start = self.vehicle_interface.time() - t = self.vehicle_interface.time() - self.t_start - res = {} - for times in self.times: - if t >= times[0] and t <= times[1]: - res['pedestrian0'] = box_to_fake_agent((0,0,0,0)) - print("Detected a pedestrian") - return res diff --git a/homework/person_detector.py b/homework/person_detector.py deleted file mode 100644 index 8f4fa3588..000000000 --- a/homework/person_detector.py +++ /dev/null @@ -1,48 +0,0 @@ -#from ultralytics import YOLO -import cv2 -import sys - -def person_detector(img : cv2.Mat): - #TODO: implement me to produce a list of (x,y,w,h) bounding boxes of people in the image - return [] - -def main(fn): - image = cv2.imread(fn) - bboxes = person_detector(image) - print("Detected",len(bboxes),"people") - for bb in bboxes: - x,y,w,h = bb - if not isinstance(x,(int,float)) or not isinstance(y,(int,float)) or not isinstance(w,(int,float)) or not isinstance(h,(int,float)): - print("WARNING: make sure to return Python numbers rather than PyTorch Tensors") - print("Corner",(x,y),"size",(w,h)) - cv2.rectangle(image, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), (255, 0, 255), 3) - cv2.imshow('Results', image) - cv2.waitKey(0) - -def main_webcam(): - cap = cv2.VideoCapture(0) - cap.set(3, 640) - cap.set(4, 480) - - print("Press space to exit") - while True: - _, image = cap.read() - - bboxes = person_detector(image) - for bb in bboxes: - x,y,w,h = bb - cv2.rectangle(image, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), (255, 0, 255), 3) - - cv2.imshow('Person detection', image) - if cv2.waitKey(1) & 0xFF == ord(' '): - break - - cap.release() - - -if __name__ == '__main__': - fn = sys.argv[1] - if fn != 'webcam': - main(fn) - else: - main_webcam() \ No newline at end of file diff --git a/launch/gather_data.yaml b/launch/gather_data.yaml index 4f3d03d4c..16f1b5bb7 100644 --- a/launch/gather_data.yaml +++ b/launch/gather_data.yaml @@ -26,7 +26,12 @@ log: # If suffix is specified, then logs will output to folder/prefix+suffix. Default uses date and time as the suffix #suffix : 'test3' # Specify which ros topics to record to vehicle.bag. Default records nothing. This records the "standard" ROS topics. - ros_topics : [] + ros_topics : ['/oak/left/image_raw', + '/oak/rgb/image_raw', + '/oak/right/image_raw', + '/oak/stereo/image_raw', + '/ouster/points', + '/livox/lidar '] # Specify options to pass to rosbag record. Default is no options. #rosbag_options : '--split --size=1024' # If True, then record all readings / commands of the vehicle interface. Default False diff --git a/homework/pedestrian_detection.yaml b/launch/pedestrian_detection.yaml similarity index 100% rename from homework/pedestrian_detection.yaml rename to launch/pedestrian_detection.yaml diff --git a/webcam.py b/webcam.py new file mode 100644 index 000000000..52f1b36a8 --- /dev/null +++ b/webcam.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +import rospy +import cv2 +from sensor_msgs.msg import Image +from cv_bridge import CvBridge + +def webcam_publisher(): + rospy.init_node('webcam_publisher', anonymous=True) + + image_pub = rospy.Publisher('/webcam', Image, queue_size=10) + + cap = cv2.VideoCapture(0) + if not cap.isOpened(): + rospy.logerr("cannot open camera") + return + + bridge = CvBridge() + + rate = rospy.Rate(30) # 30 Hz + + try: + while not rospy.is_shutdown(): + ret, frame = cap.read() + if not ret: + rospy.logerr("cannnot read frames") + break + + cv2.imshow("Webcam", frame) + + if cv2.waitKey(1) & 0xFF == ord('q'): + rospy.loginfo("press q to exit") + break + + try: + ros_image = bridge.cv2_to_imgmsg(frame, "bgr8") + except Exception as e: + rospy.logerr(f"failed to convert image: {e}") + continue + + image_pub.publish(ros_image) + rospy.loginfo("publish frame") + + rate.sleep() + + except KeyboardInterrupt: + rospy.loginfo("user key interrupt") + + finally: + cap.release() + cv2.destroyAllWindows() + rospy.loginfo("freed camera resources") + +if __name__ == '__main__': + webcam_publisher() \ No newline at end of file