خودرو خودران با OpenCV و رزپبری پای

کد کامل پروژه رو میتونید در  https://github.com/htgdokania/Self-Driving-Car ببینید.

جریان کار:

مرحله ۱: ابتدا ما باید اطلاعات دوربین خودرو  را به کامپیوتر منتقل کنیم تا اطلاعات پردازش شود

مرحله ۲: خواندن فریم در سمت سرور و پردازش آن.

مرحله ۳:بررسی نشانه گر توقف قرمز و ارسال اطلاعات به pi درصورت یافتن.

مرحله ۴: تشخیص شیب خطوط در فریم.

  • تشخیص لبه canny را اعمال می کنیم.
  • در مرحله بعد ، منطقه مورد نظر که احتیاج به فیلتر دارد(birds-eye view) را فیلتر می کنیم.
  • بعد ، خطوط را با استفاده از HoughLines تشخیص می دهیم و سپس شیب خطوط شناسایی شده را در منطقه مورد نظر محاسبه میکنیم.
  • با استفاده از این اطلاعات تعداد خطوط چپ و خط راست را پیدا می کنیم.

مرحله ۵: اطلاعات را به pi در مورد (تعداد خطوط چپ ، خطوط راست و وضعیت نشانگر قرمز) بفرستید تا خودرو مطابق ان حرکت کند.

مرحله ۶: کد Raspberry Pi در سمت سرویس گیرنده:

  • دریافت اطلاعات از کامپیوتر درباره فریم پردازش شده (شماره خطوط چپ ، راست و نشانگر قرمز)
  • بررسی مانع /عابر پیاده با استفاده از سنسور اولتراسونیک در Pi.
  • در نهایت ، بر اساس تمام این اطلاعات پردازش شده ، حرکت خودرو را انجام میدهیم.

مرحله ۱:انتقال اطلاعات دوربین Pi به کامپیوتر برای پردازش

در ابتدا Raspberry pi  را با استفاده از سیستم عامل  raspbian  تنظیم کنید.

همچنین ، picamera را روی Raspberry pi نصب کنید و از فعال بودن آن مطمئن شوید.

در حال حاضر ، ما به دو اسکریپت نیاز داریم:

یک سرور(احتمالاً روی یک دستگاه سریع ، مثلاWindows Desktop) که به اتصال از طریق Raspberry Pi گوش می دهد و کلاینت که روی Raspberry Pi کار می کند و یک جریان مداوم از تصاویر را به سرور ارسال می کند.

نکته: همیشه قبل از اجرای کد سرویس گیرنده Raspberry Pi ابتدا سرور را اجرا کنید .

کد برای VideoStreamClient.py:

import io
import socket
import struct
import time
import picamera
 
# create socket and bind host
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(('192.168.31.7', 8000))#replace with the server ip address ,port=8000
connection = client_socket.makefile('wb')
 
try:
    with picamera.PiCamera() as camera:
        camera.resolution = (640, 480)      # pi camera resolution
        camera.framerate = 15               # 15 frames/sec
        time.sleep(2)                       # give 2 secs for camera to initilize
        start = time.time()
        stream = io.BytesIO()
        # send jpeg format video stream
        for foo in camera.capture_continuous(stream, 'jpeg', use_video_port = True):
            connection.write(struct.pack('<L', stream.tell()))
            connection.flush()
            stream.seek(0)
            connection.write(stream.read())
            stream.seek(0)
            stream.truncate()
            print('New frame sent.')       
 
    connection.write(struct.pack('<L', 0))
except:
    pass

کد برای قسمت سرور (اجرا روی دسکتاپ): برای قسمت سرور ما یک کد main.py نوشته ایم که در آن فریم رو از طریق جریان شبکه می خوانیم ، و همینطور برای اقدامات بعدی پردازش می کنیم. (در مرحله بعدی توضیح میدیم)

مرحله ۲: خواندن فریم در سمت سرور و شروع به پردازش:[main.py]

در ابتدا کتاب خانه های مورد نیاز را import میکنیم.

import numpy as np
import cv2
import socket
import send_data_pi 

بعد ، ما یک کلاس با نام VideoStreaming () تعریف می کنیم. توابع مورد نیاز در آن تعریف شده است.

  • در ابتدا متغیرهای مورد نیاز برای قسمت جریان تصویر را در init () مشخص میکنیم.
  • در نهایت ، برای شروع خواندن فریم ها ، ()streaming را فراخوانی میکنیم.
class VideoStreaming(object):
     
    def __init__(self, host, port):        
        self.server_socket = socket.socket()
        self.server_socket.bind(('', port))
        self.server_socket.listen(0)
        self.connection, self.client_address = self.server_socket.accept()
        self.connection = self.connection.makefile('rb')
        self.host_name = socket.gethostname()
        self.host_ip = socket.gethostbyname(self.host_name)
        self.streaming()

بعد ، Streaming fucntion () رو مشخص میکنیم. ابتدا فریم رو می خونیم و آن رو در “image” ذخیره می کنیم. بعلاوه در داخل این تابع ، ما همه توابع دیگه رو برای پردازش این “image” از جمله ()checkforred () ، canny () ، roi () ، avg_lines فراخوانی می کنیم.

def streaming(self):
    try:
        print("Host: ", self.host_name + ' ' + self.host_ip)
        print("Connection from: ", self.client_address)
        print("Streaming...")
        print("Press 'q' to exit")
 
        # need bytes here
        stream_bytes = b' '
        while True:
            stream_bytes += self.connection.read(1024)
            first = stream_bytes.find(b'\xff\xd8')
            last = stream_bytes.find(b'\xff\xd9')
            if first != -1 and last != -1:
                jpg = stream_bytes[first:last + 2]
                stream_bytes = stream_bytes[last + 2:]
                image = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)

در این مرحله ما با موفقیت فریم فعلی را خوانده ایم. آن را با استفاده از کد زیر نمایش می دهیم:

cv2.imshow("frame",image)
  • حالا تصویر را پردازش میکنیم. اول یک کپی از تصویر تهیه کرده و ان را به صورت lane_image ذخیره می کنیم.
  • نشانگر قرمز را بررسی کنید ، در صورت موجود بودن, اطلاعات را به pi ارسال کنید تا ماشین متوقف شود.
lane_image=np.copy(image)
lane_image,red=self.checkforred(lane_image)
if red:
    self.sendinfoback(0,0,1)
  • در غیر این صورت برای تشخیص خطوط در قاب فعلی فرایند دیگه ای انجام می شود.
  • تشخیص لبه canny را انجام دهید.
  • منطقه مورد نظر را تعریف کنید.
  • تشخیص HoughLines را انجام دهید.
  • علاوه بر این ، انها را بر اساس شیب خط چپ یا راست طبقه بندی کنید.
  • اطلاعات مشابه را به pi ارسال کنید.
  • در نهایت ، خطوط روی فریم را اضافه کرده و نمایش دهید.
else:
                canny=self.canny(lane_image)
                roi=self.region_of_interest(canny)
                lane=cv2.bitwise_and(canny,roi)
                lines=cv2.HoughLinesP(lane,1,np.pi/180,30,np.array([]),minLineLength=20,maxLineGap=5)                    
                self.average_slope_intercept(lines,lane_image) 
                line_image=self.display_lines(lines,lane_image)                    
                lane_image=cv2.addWeighted(lane_image,1,line_image,1,0)
 
            cv2.imshow('frame',lane_image) #display image    
            key=cv2.waitKey(1) & 0xFF
            if  key == ord('q'):
                send_data_pi.Tcp_Close()
                break       
finally:
    self.connection.close()
    self.server_socket.close()

مرحله ۳: تابعی برای بررسی نشانگر توقف قرمز و ارسال اطلاعات به pi


تابع checkforred () را تعریف کنید که در تابع جریان بالا فراخوانی شده است.
ما کانتور با محدوده HSV را برای رنگ قرمز بررسی می کنیم تا نشانگر مشخص شود. اگر اندازه کانتور بیشتر از مقدار آستانه ما باشد ، وضعیت را روی ۱ تنظیم می کنیم و آن را برمی گردانیم.

def checkforred(self,image):
        font = cv2.FONT_HERSHEY_SIMPLEX
        hsv=cv2.cvtColor(image,cv2.COLOR_BGR2HSV)
        #Red HSV Range
        low_red=np.array([157,56,0])
        high_red=np.array([179,255,255])
         
        mask=cv2.inRange(hsv,low_red,high_red)
        blur=cv2.GaussianBlur(mask,(15,15),0)
        contours,_=cv2.findContours(blur,cv2.RETR_TREE,cv2.CHAIN_APPROX_NONE)
        status=0
        for contour in contours:
            area=cv2.contourArea(contour)
            if area>20000:
                status=1
                cv2.drawContours(image,contour,-1,(0,0,255),3)
                cv2.putText(image,'RED STOP',(240,320), font, 2,(0,0,255),2,cv2.LINE_AA)     
        return (image,status)

اگر قرمز تشخیص داده شد ، این اطلاعات را با استفاده از تابع sendinfoback () به pi میفرستیم. در اینجا ما اطلاعات را به صورت بایت ارسال می کنیم.

def sendinfoback(self,l,r,red):
    D=b''
    D+=bytes([l,r,red]) # leftlines,rightlines,redstatus
    print('here inside sendinfo',D)
    send_data_pi.Tcp_Write(D)

مرحله ۴: تشخیص شیب خطوط و شمارش خطوط چپ و راست

تشخیص لبه Canny:

  • ابتدا باید لبه ها را با استفاده از تشخیص لبه Canny تشخیص دهیم. در اینجا بر اساس تغییر پیکسل مجاور مقادیر در دو جهت x و y تشخیص داده می شوند. ما می توانیم محدوده را با استفاده از مقادیر آستانه پایین و بالا تنظیم کنیم.
def canny(self,image):
        gray=cv2.cvtColor(image,cv2.COLOR_RGB2GRAY)
        blur=cv2.GaussianBlur(gray, (7,7), 0)
        canny=cv2.Canny(blur,50,150)  # lowerThreshold=50 UpperThreshold=150 
        return canny

ناحیه مورد نظر:

در مرحله بعد ما ناحیه مورد نظر را تعریف می کنیم ، به طوری که ما فقط در آن منطقه به دنبال خطوط هستیم(نه در کل فریم).


ناحیه سفید ناحیه مورد نظر ماست

سپس یک عملیات bitwise_and operation روی نتایج بالا انجام میدهیم تا به پایین برسیم:

Houghs Transform:

Hough Line Transfer در Open CV به صورت ()cv2.HoughLinesP است. که از آن برای تشخیص خطوط استفاده می کنیم.

lines=cv2.HoughLinesP(image,ρ accuracy, θ accuracy,threshold,minLinelength,maxLineGap)

lines=cv2.HoughLinesP(lane,1,np.pi/180,30,np.array([]),minLineLength=20,maxLineGap=5)                    

  • lines: برداری که پارامترهای (ρ، θ) خطوط شناسایی شده را ذخیره می کند.
  • ρ: کیفیت پارامتر ρ در پیکسل.
  • θ: کیفیت پارامتر θ در رادیان
  • آستانه: حداقل تعداد تقاطع ها برای تشخیص خط.

سپس با فراخوانی تابع average_slope_intercept () ، شیب خطوط را بررسی می کنیم و تعداد خطوط چپ و راست تشخیص داده شده را به Pi ارسال می کنیم تا ماشین مطابق آن حرکت کند.

def average_slope_intercept(self,lines,image):
    left_fit=[]
    right_fit=[]
    if lines is not None:
        for line in lines:
            x1,y1,x2,y2=line.reshape(4)
            parameters=np.polyfit((x1,x2),(y1,y2),1)
            slope=parameters[0]
            intercept=parameters[1]
            if slope<0:
                right_fit.append((slope,intercept))
            else:
                left_fit.append((slope,intercept))
 
    left_fitavg=np.average(left_fit, axis=0)
    right_fitavg=np.average(right_fit, axis=0)
    print("left slope",left_fitavg,"rigt slope",right_fitavg)
    self.sendinfoback(len(left_fit), len(right_fit),red=0) # Send number of left and right lines detected.

بعد display_lines () را فراخوانی میکنیم تا به مختصات خطوطی را که از تابع HoughsLineP () برای نمایش تصویری دریافت کردیم بپیوندد .

def display_lines(self,lines,image):
    line_image=np.zeros_like(image)
    if lines is not None:
        for line in lines:
            if len(line)>0:                    
                x1,y1,x2,y2=line.reshape(4)
                cv2.line(line_image,(x1,y1),(x2,y2),[0,255,0],10)
    return line_image

در نهایت ، این تصویر را با فریم اصلی برای دید بهتر لبه تشخیص داده شده اضافه کنید.

مرحله ۵:ارسال اطلاعات به pi (تعداد خطوط چپ ، خطوط راست و وضعیت نشانگر قرمز)

ابتدا ، قسمت سرور را برای این (در آدرس پورت جداگانه) می نویسیم ، که آن را در تابع avg_slope_intercept () فراخوانی میکنیم.

این اطلاعات مربوط به تعداد خطوط چپ ، خطوط راست و وضعیت نشانگر رنگ قرمز موجود در فریم فعلی را ارسال می کند.

کد [send_data_pi.py]:

import socket
     
def Tcp_server_wait ( numofclientwait, port ):
    global s2
    s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s2.bind(('',port))
    s2.listen(numofclientwait)
     
def Tcp_server_next ( ):
    global s
    s = s2.accept()[0]
     
def Tcp_Write(D):
   s.send(D + b'\r')
   return
    
def Tcp_Close( ):
   s.close()
   s2.close()
   return
 
Tcp_server_wait ( 5, 17098 ) # This ata is being sent using a different port
Tcp_server_next()

کد بالا را در main.py وارد می کنیم و هر زمان که برای ارسال اطلاعات به pi نیاز باشد ، تابع زیر را فراخوانی می کنیم.

def sendinfoback(self,l,r,red):
    D=b''
    D+=bytes([l,r,red]) # leftlines,rightlines,redstatus
    print('here inside sendinfo',D)
    send_data_pi.Tcp_Write(D)

مرحله ۶:کد سمت مشتری Raspberry pi:

در pi در یک کد جداگانه [main.py]:

  • ابتدا ، اطلاعات ارسال شده از کامپیوتر که حاوی اطلاعاتی در مورد وضعیت نشانگر چپ ، راست و قرمز است را می خوانیم.
  • به علاوه، فاصله از سنسور اولتراسونیک متصل به pi را بدست بیارید.
  • بر اساس این اطلاعات ، ماشین را حرکت دهید.

در ابتداکتاب خانه ها را import میکنیم.

from time import sleep
import CarMove    
import ActionClientRead
from ultrasonic import UltraSonic 
US=UltraSonic()
m1=CarMove.move()

CarMove.py :

این کد شامل اطلاعاتی برای دستکاری حرکت خودرو است:

اتصالات:

می تونید از هر کدام از شش پین GPIO به دلخواه خودتان استفاده کنید.

ما باعث می شویم چرخ های خودرو همیشه به جلو حرکت کنند . فقط با تغییر مقادیر PWM چپ و راست, ما پین های چپ ، راست را کنترل کرده و حرکت را متوقف می کنیم .چون سرعت موتور را تغییر می دهد.

import کردن کتاب خانه ها:

import RPi.GPIO as GPIO          
from time import sleep

پین های GPIO که متصل اند را مشخص کنید:

#Connections from Motor Driver to Pi GPIO
Rin1 = 21
Rin2 = 20
Ren = 12 # Right Enable
Lin1 = 13
Lin2 = 19
Len = 26 #left Enable

پین ها را به عنوان نوع ورودی یا خروجی تنظیم کنید و پین های فعال سازی را مقداری دهی کنید:

initialvaluespeed=30 # This should be between 0 to 100
 
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
#Initialization for right motor
GPIO.setup(Rin1,GPIO.OUT)
GPIO.setup(Rin2,GPIO.OUT)
GPIO.setup(Ren,GPIO.OUT)
GPIO.output(Rin1,GPIO.LOW)
GPIO.output(Rin2,GPIO.LOW)
Rp=GPIO.PWM(Ren,1000)
Rp.start(initialvaluespeed)
#Initialization for left motor
GPIO.setup(Lin1,GPIO.OUT)
GPIO.setup(Lin2,GPIO.OUT)
GPIO.setup(Len,GPIO.OUT)
GPIO.output(Lin1,GPIO.LOW)
GPIO.output(Lin2,GPIO.LOW)
Lp=GPIO.PWM(Len,1000)
Lp.start(initialvaluespeed)

در نهایت یک کلاس ()move برای توابع مختلف حرکت ایجاد کنید:

class move():
    def __init__(self):
        print("starting")
         
    def Rspeed(self,val):
        Rp.ChangeDutyCycle(initialvaluespeed+val)
  
    def Lspeed(self,val):
        Lp.ChangeDutyCycle(initialvaluespeed+val)
 
    def forward(self):
        GPIO.output(Rin2,GPIO.LOW)
        GPIO.output(Rin1,GPIO.HIGH)
        GPIO.output(Lin1,GPIO.LOW)
        GPIO.output(Lin2,GPIO.HIGH)
 
    def escape():
        GPIO.cleanup()

ActionClientRead.py :

این قسمت بایت های ارسال شده در پورت ۱۷۰۹۸ را برای دریافت اطلاعات مربوط به وضعیت نشانگر چپ ، راست ، قرمز می خواند.

import socket, time
def Tcp_connect( HostIp, Port ):
    global s
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((HostIp, Port))
    return
def Tcp_Read():
    a = ''
    b = b''
    while a != b'\r':
        a = s.recv(1)
        b = b + a
    return b
 
def Tcp_Close():
   s.close()
   return
Tcp_connect( '192.168.31.7', 17098) # Replace with Your Server IP,port=17098

Ultrasonic.py:

اتصالات:

در ابتدا کتاب خانه ها را import و پین ها را مشخص میکنیم:

import RPi.GPIO as GPIO
import time
 
GPIO.setmode(GPIO.BCM)
GPIO_TRIGGER = 4
GPIO_ECHO = 17
 
GPIO.setwarnings(False)
GPIO.setup(GPIO_TRIGGER, GPIO.OUT)
GPIO.setup(GPIO_ECHO, GPIO.IN)
GPIO.output(GPIO_TRIGGER, False)
print ("Waiting For Sensor To Settle")
time.sleep(2)

بعد کلاس ()ultraSonic را می سازیم:

class UltraSonic():
     
    def __init__ (self):
        print("started")
 
    def Distance(self):
        #trigger the ultrasonic sensor for a very short period (10us).
        GPIO.output(GPIO_TRIGGER, True)
        time.sleep(0.00001)
        GPIO.output(GPIO_TRIGGER, False)
         
        while GPIO.input(GPIO_ECHO) == 0:
            pass
        StartTime = time.time() #start timer once the pulse is sent completely and echo becomes high or 1
        while GPIO.input(GPIO_ECHO) == 1:
            pass
        StopTime = time.time() #stop the timer once the signal is completely received  and echo again becomes 0
 
        TimeElapsed = StopTime - StartTime # This records the time duration for which echo pin was high 
        speed=34300 #speed of sound in air 343 m/s  or 34300cm/s
        twicedistance = (TimeElapsed * speed) #as time elapsed accounts for amount of time it takes for the pulse to go and come back  
        distance=twicedistance/2  # to get actual distance simply divide it by 2
        time.sleep(.01)
        return round(distance,2) # round off upto 2 decimal points

در سطور بالا ، کتابخانه ها را وارد کردیم. بعد ، در یک حلقه بی نهایت: ابتدا همه اطلاعات (شماره خطوط چپ ، شماره خطوط راست ، وضعیت قرمز ، فاصله اولتراسونیک) را بدست آورده و سپس ماشین را بر این اساس حرکت دهید:

dat=[0,0,0,0,0] # initialize  array to store info
while 1:
    c=0
    D=ActionClientRead.Tcp_Read() # read the data 
    for b in D:
        dat[c]=b
        c+=1
    left=dat[0]  # number of left lines detected
    right=dat[1] # number of right lines detected
    red=dat[2]   # Indicate whether red Color Marker present (1) or Not (0)
 
    dis=US.Distance() #Get current Distance from US sensor
    print("distance=",dis)
    print("left=",left)
    print("right=",right)
     
    speedR,speedL=0,0  # Default forward condition
 
    if red or dis<15 : # Stop the car if condition is true 
        speedR=-1*CarMove.initialvaluespeed
        speedL=-1*CarMove.initialvaluespeed
    elif(left>right): # if left lines are more ==> move left by stopping the left wheel and increasing right wheel speed 
        speedR=10
        speedL=-1*CarMove.initialvaluespeed
    elif(right>left): # if right is more==> move right by stopping the right wheel and increasing left wheel speed
        speedL=10
        speedR=-1*CarMove.initialvaluespeed
 
    m1.Rspeed(speedR)
    m1.Lspeed(speedL)
    m1.forward()

حالا به سادگی ماشین را در یک پیست قرار دهید ، آن را روشن کنید و کد را اجرا کنید!!!

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

قبلا حساب کاربری ایجاد کرده اید؟
گذرواژه خود را فراموش کرده اید؟
Loading...