一. 前言
我们决定使用oak相机采集rgb图像,识别人体的姿态,然后将姿态转换为英文字母,然后作为控制信号利用socket发送到unity3d中对小车进行控制。
二. 代码功能模块
1. socket通信
函数 socket.socket 创建一个 socket,该函数带有Address Family和Type两个参数。
Address Family:可以选择AF_INET(用于 Internet 进程间通信) 或者AF_UNIX(用于同一台机器进程间通信),实际工作中常用AF_INET。
Type:套接字类型,可以是SOCK_STREAM(流式套接字,主要用于 TCP 协议)或者SOCK_DGRAM(数据报套接字,主要用于 UDP 协议)
sk.listen(backlog)开始监听传入连接。backlog指定在拒绝连接之前,可以挂起的最大连接数量。backlog等于5,表示内核已经接到了连接请求,但服务器还没有调用accept进行处理的连接个数最大为5。这个值不能无限大,因为要在内核中维护连接队列。
def connectServer():
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print("Socket created")
try:
sk.bind((HOST, PORT))
except socket.error as msg:
print(msg)
sys.exit()
print("Socket bind complete")
sk.listen(10)
conn, addr = sk.accept()
print("Connected with " + addr[0] + ":" + str(addr[1]))
return conn, sk
2. 将人体姿态转换为英文字母
MovenetDepthai.py模块使用了COCO数据集使用的关键点评估指标。
semaphore_flag = {
(3, 4): 'W', (2, 4): '-', (1, 4): 'A', (0, 4): '-',
(4, 7): 'S', (4, 6): '-', (4, 5): 'D', (2, 3): '-',
(0, 3): 'Q', (0, 6): '-', (3, 0): 'E', (3, 7): '-',
(3, 6): 'J', (3, 5): '-', (2, 1): 'K', (2, 0): '-',
(2, 7): 'P', (2, 6): '-', (2, 5): 'L', (1, 0): '-',
(1, 7): 'U', (0, 5): '-', (7, 6): 'F', (7, 5): '-',
(1, 6): '-', (5, 6): '-',
}
- 将思路转换为代码
def recognize_gesture(body):
def angle_with_y(v):
if v[1] == 0:
return 90
angle = atan2(v[0], v[1])
return degrees(angle)
if body.scores[KEYPOINT_DICT['right_elbow']] < body.score_thresh or \
body.scores[KEYPOINT_DICT['right_shoulder']] < body.score_thresh or \
body.scores[KEYPOINT_DICT['left_elbow']] < body.score_thresh or \
body.scores[KEYPOINT_DICT['left_shoulder']] < body.score_thresh:
return None
right_arm_angle = angle_with_y(body.keypoints[KEYPOINT_DICT['right_elbow']] - body.keypoints[KEYPOINT_DICT['right_shoulder']])
left_arm_angle = angle_with_y(body.keypoints[KEYPOINT_DICT['left_elbow']] - body.keypoints[KEYPOINT_DICT['left_shoulder']])
right_pose = int((right_arm_angle + 202.5) / 45) % 8
left_pose = int((left_arm_angle + 202.5) / 45) % 8
letter = semaphore_flag.get((right_pose, left_pose), None)
if letter is not None:
letter = letter + str(right_pose) + str(left_pose)
return letter
3. 对视频的每一帧进行识别
使用MovenetDepthai模块进行人体姿态识别,并将识别结果实时渲染到采集的视频画面中,然后在画面中加上文字表示的识别结果。
while True:
frame, body = pose.next_frame()
if frame is None:
break
frame = renderer.draw(frame, body)
letter = recognize_gesture(body)
if letter:
cv2.putText(frame, letter, (frame.shape[1] // 2, 100), cv2.FONT_HERSHEY_PLAIN, 5, (0, 190, 255), 3)
conn.sendall(letter[0:1].encode())
if letter == "F":
conn.close()
sk.close()
break
key = renderer.waitKey(delay=1)
三. 整体代码
import cv2
import sys
import socket
import argparse
from math import atan2, degrees
from MovenetDepthai import MovenetDepthai, KEYPOINT_DICT
from MovenetRenderer import MovenetRenderer
sys.path.append("")
HOST = "10.27.209.121"
PORT = 8888
semaphore_flag = {
(3, 4): 'W', (2, 4): '-', (1, 4): 'A', (0, 4): '-',
(4, 7): 'S', (4, 6): '-', (4, 5): 'D', (2, 3): '-',
(0, 3): 'Q', (0, 6): '-', (3, 0): 'E', (3, 7): '-',
(3, 6): 'J', (3, 5): '-', (2, 1): 'K', (2, 0): '-',
(2, 7): 'P', (2, 6): '-', (2, 5): 'L', (1, 0): '-',
(1, 7): 'U', (0, 5): '-', (7, 6): 'F', (7, 5): '-',
(1, 6): '-', (5, 6): '-',
}
def connectServer():
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print("Socket created")
try:
sk.bind((HOST, PORT))
except socket.error as msg:
print(msg)
sys.exit()
print("Socket bind complete")
sk.listen(10)
conn, addr = sk.accept()
print("Connected with " + addr[0] + ":" + str(addr[1]))
return conn, sk
def recognize_gesture(body):
def angle_with_y(v):
if v[1] == 0:
return 90
angle = atan2(v[0], v[1])
return degrees(angle)
if body.scores[KEYPOINT_DICT['right_elbow']] < body.score_thresh or \
body.scores[KEYPOINT_DICT['right_shoulder']] < body.score_thresh or \
body.scores[KEYPOINT_DICT['left_elbow']] < body.score_thresh or \
body.scores[KEYPOINT_DICT['left_shoulder']] < body.score_thresh:
return None
right_arm_angle = angle_with_y(body.keypoints[KEYPOINT_DICT['right_elbow']] - body.keypoints[KEYPOINT_DICT['right_shoulder']])
left_arm_angle = angle_with_y(body.keypoints[KEYPOINT_DICT['left_elbow']] - body.keypoints[KEYPOINT_DICT['left_shoulder']])
right_pose = int((right_arm_angle + 202.5) / 45) % 8
left_pose = int((left_arm_angle + 202.5) / 45) % 8
letter = semaphore_flag.get((right_pose, left_pose), None)
if letter is not None:
letter = letter + str(right_pose) + str(left_pose)
return letter
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--model", type=str, choices=['lightning', 'thunder'], default='thunder',
help="Model to use (default=%(default)s")
parser.add_argument('-i', '--input', type=str, default='rgb',
help="'rgb' or 'rgb_laconic' or path to video/image file to use as input (default: %(default)s)")
parser.add_argument("-o", "--output",
help="Path to output video file")
args = parser.parse_args()
pose = MovenetDepthai(input_src=args.input, model=args.model)
renderer = MovenetRenderer(pose, output=args.output)
conn, sk = connectServer()
while True:
frame, body = pose.next_frame()
if frame is None:
break
frame = renderer.draw(frame, body)
letter = recognize_gesture(body)
if letter:
cv2.putText(frame, letter, (frame.shape[1] // 2, 100), cv2.FONT_HERSHEY_PLAIN, 5, (0, 190, 255), 3)
conn.sendall(letter[0:1].encode())
if letter == "F":
conn.close()
sk.close()
break
key = renderer.waitKey(delay=1)
renderer.exit()
pose.exit()