Forem

El Bruno for Microsoft Azure

Posted on β€’ Originally published at elbruno.com on

3 1

#Drone 🚁 – Detect Squirrels 🐿️ and Space Wolves 🐺 from a drone camera using Python 🐍

Hi !

In a couple of hours, I’ll host a session where I show how to control a drone using Python 🐍; and also how to access the drone camera and apply AI on top of the camera feed.

Python SWFL – Let’s code a drone to follow faces

In example, this is the drone doing a simple face detection on me (in my old office)

animation of the drone performing face detection

If you want to know more, I write a lot about how to do this using Python and OpenCV, please check the references section.

And in the meantime, here is the full final demo.

# Copyright (c) 2022
# Author : Bruno Capuano
# Create Time : 2022 Feb
# Change Log :
# - Open drone camera with openCV
# - Analyze camera frame with local custom vision project running in an app
# - Key D enable / disable object detection
# - On detection enabled
# - Save original image in tmp folder
# - Save image with bounding boxes and detected objects in det folder
# - Save json with bounding boxes and detected objects in det folder
# - Display bounding boxes and detected objects in the camera frame
# - Save a local video with the camera recorded
# - Key T for Take off, L to land; and ASDWRF to control the drone
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import socket
import time
import threading
import cv2
import json
import requests
from flask import Flask, jsonify
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
pitch = int(list[1])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) - timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# -----------------------------------------------
# Local calls
# -----------------------------------------------
probabilityThreshold = 25
def displayPredictions(jsonPrediction, frame):
global camera_Width, camera_Heigth
jsonObj = json.loads(jsonPrediction)
preds = jsonObj['predictions']
sorted_preds = sorted(preds, key=lambda x: x['probability'], reverse=True)
strSortedPreds = ""
resultFound = False
if (sorted_preds):
detected = False
for pred in sorted_preds:
# tag name and prob * 100
tagName = str(pred['tagName'])
probability = pred['probability'] * 100
# apply threshold
if (probability >= probabilityThreshold):
detected = True
bb = pred['boundingBox']
# adjust to size
height = int(bb['height'] * camera_Heigth)
left = int(bb['left'] * camera_Width)
top = int(bb['top'] * camera_Heigth)
width = int(bb['width'] * camera_Width)
# draw bounding boxes
start_point = (left, top)
end_point = (left + width, top + height)
color = (0, 0, 255)
if(tagName == "squirrel"):
color = (0, 255, 0)
thickness = 2
cv2.rectangle(img, start_point, end_point, color, thickness)
# display labels
start_point_label = (left, top - 5)
text = "{}: {:.4f}".format(tagName, probability)
cv2.putText(img, text, start_point_label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
print(f'{tagName} - {probability}')
print(f'start point: {start_point} - end point: {end_point}')
print(jsonPrediction)
if (detected == True):
detImageFileName = frameImageFileName.replace('tmp', 'det')
cv2.imwrite(detImageFileName, img)
detJsonFileName = detImageFileName.replace('png', 'json')
save_text = open(detJsonFileName, 'w')
save_text.write(jsonStr)
save_text.close()
return strSortedPreds
# instantiate flask app and push a context
app = Flask(__name__)
# -----------------------------------------------
# Main program
# -----------------------------------------------
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
pitch = 0
flyUnit = 50
# open UDP
camera_Width = 640 #1280
camera_Heigth = 480 #960
camera_Size = (camera_Width, camera_Heigth)
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open video writer to save video
vid_cod = cv2.VideoWriter_fourcc(*'XVID')
vid_output = cv2.VideoWriter("videos/dronecam_video.mp4", vid_cod, 20.0, camera_Size)
# open
drone_flying = False
detectionEnabled = False
i = 0
while True:
i = i + 1
imgNumber = str(i).zfill(5)
start_time = time.time()
sendReadCommand('battery?')
print(f'battery: {battery} % - pitch: {pitch} - i: {imgNumber}')
try:
ret, frame = cap.read()
img = cv2.resize(frame, camera_Size)
if (detectionEnabled):
# save image to disk and open it
frameImageFileName = str(f'tmp\image{imgNumber}.png')
cv2.imwrite(frameImageFileName, img)
with open(frameImageFileName, 'rb') as f:
img_data = f.read()
# analyze file in local container
api_url = "http://127.0.0.1:80/image"
r = requests.post(api_url, data=img_data)
with app.app_context():
jsonResults = jsonify(r.json())
jsonStr = jsonResults.get_data(as_text=True)
displayPredictions(jsonStr, frame)
fpsInfo = ""
if (time.time() - start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() - start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(img, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno - DJI Tello Camera', img)
vid_output.write(img)
except Exception as e:
detectionEnabled = False
print(f'exc: {e}')
pass
# key controller
key = cv2.waitKey(1) & 0xFF
if key == ord("d"):
if (detectionEnabled == True):
detectionEnabled = False
else:
detectionEnabled = True
if cv2.waitKey(1) & 0xFF == ord('t'):
drone_flying = True
detection_started = True
msg = "takeoff"
sendCommand(msg)
if cv2.waitKey(1) & 0xFF == ord('l'):
drone_flying = False
msg = "land"
sendCommand(msg)
time.sleep(5)
if (cv2.waitKey(1) & 0xFF == ord('w')) and drone_flying == True:
msg = str(f"up {flyUnit}")
sendCommand(msg)
time.sleep(1)
if (cv2.waitKey(1) & 0xFF == ord('s')) and drone_flying == True:
msg = str(f"down {flyUnit}")
sendCommand(msg)
time.sleep(1)
if (cv2.waitKey(1) & 0xFF == ord('a')) and drone_flying == True:
msg = str(f"left {flyUnit}")
sendCommand(msg)
time.sleep(1)
if (cv2.waitKey(1) & 0xFF == ord('d')) and drone_flying == True:
msg = str(f"right {flyUnit}")
sendCommand(msg)
time.sleep(1)
if (cv2.waitKey(1) & 0xFF == ord('r')) and drone_flying == True:
msg = str(f"forward {flyUnit}")
sendCommand(msg)
time.sleep(1)
if (cv2.waitKey(1) & 0xFF == ord('f')) and drone_flying == True:
msg = str(f"back {flyUnit}")
sendCommand(msg)
time.sleep(1)
if key == ord("q"):
break
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')
# close the already opened camera, and the video file
cap.release()
vid_output.release()
cv2.destroyAllWindows()

Happy coding!

Greetings

El Bruno


Coding4Fun Drone 🚁 posts

  1. Introduction to DJI Tello
  2. Analyzing Python samples code from the official SDK
  3. Drone Hello World ! Takeoff and land
  4. Tips to connect to Drone WiFi in Windows 10
  5. Reading data from the Drone, Get battery level
  6. Sample for real time data read, Get Accelerometer data
  7. How the drone camera video feed works, using FFMPEG to display the feed
  8. Open the drone camera video feed using OpenCV
  9. Performance and OpenCV, measuring FPS
  10. Detect faces using the drone camera
  11. Detect a banana and land!
  12. Flip when a face is detected!
  13. How to connect to Internet and to the drone at the same time
  14. Video with real time demo using the drone, Python and Visual Studio Code
  15. Using custom vision to analyze drone camera images
  16. Drawing frames for detected objects in real-time in the drone camera feed
  17. Save detected objects to local files, images and JSON results
  18. Save the Drone camera feed into a local video file
  19. Overlay images into the Drone camera feed using OpenCV
  20. Instance Segmentation from the Drone Camera using OpenCV, TensorFlow and PixelLib
  21. Create a 3Γ—3 grid on the camera frame to detect objects and calculate positions in the grid
  22. Create an Azure IoT Central Device Template to work with drone information
  23. Create a Drone Device for Azure IoT Central
  24. Send drone information to Azure IoT Central

Image of Timescale

πŸš€ pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applicationsβ€”without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read more

Top comments (0)

Postmark Image

Speedy emails, satisfied customers

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up