Hi !
In a couple of hours, Iβll host a session where I show how to control a drone using Python π; and also how to access the drone camera and apply AI on top of the camera feed.
Python SWFL β Letβs code a drone to follow faces
In example, this is the drone doing a simple face detection on me (in my old office)
If you want to know more, I write a lot about how to do this using Python and OpenCV, please check the references section.
And in the meantime, here is the full final demo.
# Copyright (c) 2022 | |
# Author : Bruno Capuano | |
# Create Time : 2022 Feb | |
# Change Log : | |
# - Open drone camera with openCV | |
# - Analyze camera frame with local custom vision project running in an app | |
# - Key D enable / disable object detection | |
# - On detection enabled | |
# - Save original image in tmp folder | |
# - Save image with bounding boxes and detected objects in det folder | |
# - Save json with bounding boxes and detected objects in det folder | |
# - Display bounding boxes and detected objects in the camera frame | |
# - Save a local video with the camera recorded | |
# - Key T for Take off, L to land; and ASDWRF to control the drone | |
# | |
# The MIT License (MIT) | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
# THE SOFTWARE. | |
import socket | |
import time | |
import threading | |
import cv2 | |
import json | |
import requests | |
from flask import Flask, jsonify | |
def receiveData(): | |
global response | |
while True: | |
try: | |
response, _ = clientSocket.recvfrom(1024) | |
except: | |
break | |
def readStates(): | |
global battery | |
while True: | |
try: | |
response_state, _ = stateSocket.recvfrom(256) | |
if response_state != 'ok': | |
response_state = response_state.decode('ASCII') | |
list = response_state.replace(';', ':').split(':') | |
battery = int(list[21]) | |
pitch = int(list[1]) | |
except: | |
break | |
def sendCommand(command): | |
global response | |
timestamp = int(time.time() * 1000) | |
clientSocket.sendto(command.encode('utf-8'), address) | |
while response is None: | |
if (time.time() * 1000) - timestamp > 5 * 1000: | |
return False | |
return response | |
def sendReadCommand(command): | |
response = sendCommand(command) | |
try: | |
response = str(response) | |
except: | |
pass | |
return response | |
def sendControlCommand(command): | |
response = None | |
for i in range(0, 5): | |
response = sendCommand(command) | |
if response == 'OK' or response == 'ok': | |
return True | |
return False | |
# ----------------------------------------------- | |
# Local calls | |
# ----------------------------------------------- | |
probabilityThreshold = 25 | |
def displayPredictions(jsonPrediction, frame): | |
global camera_Width, camera_Heigth | |
jsonObj = json.loads(jsonPrediction) | |
preds = jsonObj['predictions'] | |
sorted_preds = sorted(preds, key=lambda x: x['probability'], reverse=True) | |
strSortedPreds = "" | |
resultFound = False | |
if (sorted_preds): | |
detected = False | |
for pred in sorted_preds: | |
# tag name and prob * 100 | |
tagName = str(pred['tagName']) | |
probability = pred['probability'] * 100 | |
# apply threshold | |
if (probability >= probabilityThreshold): | |
detected = True | |
bb = pred['boundingBox'] | |
# adjust to size | |
height = int(bb['height'] * camera_Heigth) | |
left = int(bb['left'] * camera_Width) | |
top = int(bb['top'] * camera_Heigth) | |
width = int(bb['width'] * camera_Width) | |
# draw bounding boxes | |
start_point = (left, top) | |
end_point = (left + width, top + height) | |
color = (0, 0, 255) | |
if(tagName == "squirrel"): | |
color = (0, 255, 0) | |
thickness = 2 | |
cv2.rectangle(img, start_point, end_point, color, thickness) | |
# display labels | |
start_point_label = (left, top - 5) | |
text = "{}: {:.4f}".format(tagName, probability) | |
cv2.putText(img, text, start_point_label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) | |
print(f'{tagName} - {probability}') | |
print(f'start point: {start_point} - end point: {end_point}') | |
print(jsonPrediction) | |
if (detected == True): | |
detImageFileName = frameImageFileName.replace('tmp', 'det') | |
cv2.imwrite(detImageFileName, img) | |
detJsonFileName = detImageFileName.replace('png', 'json') | |
save_text = open(detJsonFileName, 'w') | |
save_text.write(jsonStr) | |
save_text.close() | |
return strSortedPreds | |
# instantiate flask app and push a context | |
app = Flask(__name__) | |
# ----------------------------------------------- | |
# Main program | |
# ----------------------------------------------- | |
# connection info | |
UDP_IP = '192.168.10.1' | |
UDP_PORT = 8889 | |
last_received_command = time.time() | |
STATE_UDP_PORT = 8890 | |
address = (UDP_IP, UDP_PORT) | |
response = None | |
response_state = None | |
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
clientSocket.bind(('', UDP_PORT)) | |
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
stateSocket.bind(('', STATE_UDP_PORT)) | |
# start threads | |
recThread = threading.Thread(target=receiveData) | |
recThread.daemon = True | |
recThread.start() | |
stateThread = threading.Thread(target=readStates) | |
stateThread.daemon = True | |
stateThread.start() | |
# connect to drone | |
response = sendControlCommand("command") | |
print(f'command response: {response}') | |
response = sendControlCommand("streamon") | |
print(f'streamon response: {response}') | |
# drone information | |
battery = 0 | |
pitch = 0 | |
flyUnit = 50 | |
# open UDP | |
camera_Width = 640 #1280 | |
camera_Heigth = 480 #960 | |
camera_Size = (camera_Width, camera_Heigth) | |
# open UDP | |
print(f'opening UDP video feed, wait 2 seconds ') | |
videoUDP = 'udp://192.168.10.1:11111' | |
cap = cv2.VideoCapture(videoUDP) | |
time.sleep(2) | |
# open video writer to save video | |
vid_cod = cv2.VideoWriter_fourcc(*'XVID') | |
vid_output = cv2.VideoWriter("videos/dronecam_video.mp4", vid_cod, 20.0, camera_Size) | |
# open | |
drone_flying = False | |
detectionEnabled = False | |
i = 0 | |
while True: | |
i = i + 1 | |
imgNumber = str(i).zfill(5) | |
start_time = time.time() | |
sendReadCommand('battery?') | |
print(f'battery: {battery} % - pitch: {pitch} - i: {imgNumber}') | |
try: | |
ret, frame = cap.read() | |
img = cv2.resize(frame, camera_Size) | |
if (detectionEnabled): | |
# save image to disk and open it | |
frameImageFileName = str(f'tmp\image{imgNumber}.png') | |
cv2.imwrite(frameImageFileName, img) | |
with open(frameImageFileName, 'rb') as f: | |
img_data = f.read() | |
# analyze file in local container | |
api_url = "http://127.0.0.1:80/image" | |
r = requests.post(api_url, data=img_data) | |
with app.app_context(): | |
jsonResults = jsonify(r.json()) | |
jsonStr = jsonResults.get_data(as_text=True) | |
displayPredictions(jsonStr, frame) | |
fpsInfo = "" | |
if (time.time() - start_time ) > 0: | |
fpsInfo = "FPS: " + str(1.0 / (time.time() - start_time)) # FPS = 1 / time to process loop | |
font = cv2.FONT_HERSHEY_DUPLEX | |
cv2.putText(img, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1) | |
cv2.imshow('@elbruno - DJI Tello Camera', img) | |
vid_output.write(img) | |
except Exception as e: | |
detectionEnabled = False | |
print(f'exc: {e}') | |
pass | |
# key controller | |
key = cv2.waitKey(1) & 0xFF | |
if key == ord("d"): | |
if (detectionEnabled == True): | |
detectionEnabled = False | |
else: | |
detectionEnabled = True | |
if cv2.waitKey(1) & 0xFF == ord('t'): | |
drone_flying = True | |
detection_started = True | |
msg = "takeoff" | |
sendCommand(msg) | |
if cv2.waitKey(1) & 0xFF == ord('l'): | |
drone_flying = False | |
msg = "land" | |
sendCommand(msg) | |
time.sleep(5) | |
if (cv2.waitKey(1) & 0xFF == ord('w')) and drone_flying == True: | |
msg = str(f"up {flyUnit}") | |
sendCommand(msg) | |
time.sleep(1) | |
if (cv2.waitKey(1) & 0xFF == ord('s')) and drone_flying == True: | |
msg = str(f"down {flyUnit}") | |
sendCommand(msg) | |
time.sleep(1) | |
if (cv2.waitKey(1) & 0xFF == ord('a')) and drone_flying == True: | |
msg = str(f"left {flyUnit}") | |
sendCommand(msg) | |
time.sleep(1) | |
if (cv2.waitKey(1) & 0xFF == ord('d')) and drone_flying == True: | |
msg = str(f"right {flyUnit}") | |
sendCommand(msg) | |
time.sleep(1) | |
if (cv2.waitKey(1) & 0xFF == ord('r')) and drone_flying == True: | |
msg = str(f"forward {flyUnit}") | |
sendCommand(msg) | |
time.sleep(1) | |
if (cv2.waitKey(1) & 0xFF == ord('f')) and drone_flying == True: | |
msg = str(f"back {flyUnit}") | |
sendCommand(msg) | |
time.sleep(1) | |
if key == ord("q"): | |
break | |
response = sendControlCommand("streamoff") | |
print(f'streamon response: {response}') | |
# close the already opened camera, and the video file | |
cap.release() | |
vid_output.release() | |
cv2.destroyAllWindows() |
Happy coding!
Greetings
El Bruno
Coding4Fun Drone π posts
- Introduction to DJI Tello
- Analyzing Python samples code from the official SDK
- Drone Hello World ! Takeoff and land
- Tips to connect to Drone WiFi in Windows 10
- Reading data from the Drone, Get battery level
- Sample for real time data read, Get Accelerometer data
- How the drone camera video feed works, using FFMPEG to display the feed
- Open the drone camera video feed using OpenCV
- Performance and OpenCV, measuring FPS
- Detect faces using the drone camera
- Detect a banana and land!
- Flip when a face is detected!
- How to connect to Internet and to the drone at the same time
- Video with real time demo using the drone, Python and Visual Studio Code
- Using custom vision to analyze drone camera images
- Drawing frames for detected objects in real-time in the drone camera feed
- Save detected objects to local files, images and JSON results
- Save the Drone camera feed into a local video file
- Overlay images into the Drone camera feed using OpenCV
- Instance Segmentation from the Drone Camera using OpenCV, TensorFlow and PixelLib
- Create a 3Γ3 grid on the camera frame to detect objects and calculate positions in the grid
- Create an Azure IoT Central Device Template to work with drone information
- Create a Drone Device for Azure IoT Central
- Send drone information to Azure IoT Central
Top comments (0)