DEV Community

DEDIPYA SADE
DEDIPYA SADE

Posted on

SIH code

python3 -m venv venv
source venv/bin/activate
pip install opencv-python-headless numpy flask matplotlib scipy pillow

!/usr/bin/env python3

"""
robot.py
Single-file GPS-denied robot demo:

  • EKF (wheel odom + IMU, occasional VO updates)
  • Visual Odometry (ORB)
  • Occupancy Grid mapping (log-odds)
  • A* planner on the occupancy grid
  • Pure Pursuit controller + safety stop
  • Simple Flask dashboard showing occupancy, pose, sensors Sensor read functions are stubbed/simulated — replace with real drivers on real hardware. """

import time
import threading
import math
import io
import base64
from typing import List, Tuple

import numpy as np
import cv2
from flask import Flask, Response, render_template_string, jsonify
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from PIL import Image
import heapq

---------------------------

Sensor stubs / wrappers

---------------------------

class SensorHealth:
def init(self):
t = time.time()
self.last_imu = t
self.last_odom = t
self.last_cam = t
self.last_lidar = t

def update(self, imu=False, odom=False, cam=False, lidar=False):
now = time.time()
if imu: self.last_imu = now
if odom: self.last_odom = now
if cam: self.last_cam = now
if lidar: self.last_lidar = now

def ok(self, timeout=1.0):
now = time.time()
return {
'imu': (now - self.last_imu) < timeout,
'odom': (now - self.last_odom) < timeout,
'cam': (now - self.last_cam) < timeout,
'lidar': (now - self.last_lidar) < timeout
}




Replace these with real sensor calls on hardware

def read_imu():
"""
Return (wz, ax) = (angular velocity z in rad/s, linear accel x in m/s^2)
Replace with real IMU reading (e.g., MPU driver).
"""
wz = np.random.normal(0.0, 0.02)
ax = np.random.normal(0.0, 0.1)
return float(wz), float(ax)

def read_wheel_odometry():
"""
Return (delta_distance_m, delta_heading_rad) since last call.
Replace with encoder integration.
"""
# Simulated small forward motion per loop (for demo)
d = 0.02 + np.random.normal(0.0, 0.004)
dtheta = np.random.normal(0.0, 0.004)
return float(d), float(dtheta)

def read_lidar_scan(num_beams=180, max_range=5.0):
"""
Return (ranges, angles) arrays.
Replace with real lidar driver output.
This stub simulates a front obstacle.
"""
angles = np.linspace(-np.pi/2, np.pi/2, num_beams)
ranges = max_range * np.ones_like(angles)
center_idx = num_beams // 2
ranges[center_idx-3:center_idx+4] = 1.5 + np.random.normal(0.0, 0.03, 7)
ranges += np.random.normal(0.0, 0.02, size=angles.shape)
ranges = np.clip(ranges, 0.02, max_range)
return ranges.astype(float), angles.astype(float)

class CameraWrapper:
def init(self, source=0, width=320, height=240):
self.cap = cv2.VideoCapture(source)
# attempt to set size (may fail on some cameras)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.last_frame = None

def read(self):
ret, frame = self.cap.read()
if not ret or frame is None:
# return blank frame to avoid breaking VO
frame = np.zeros((240,320,3), dtype=np.uint8)
self.last_frame = frame
return frame

def release(self):
self.cap.release()




EKF (3-state)

---------------------------

class EKF:
def init(self):
self.x = np.zeros((3,1)) # x, y, theta
self.P = np.diag([0.1, 0.1, 0.05])
# small process noise
self.Q = np.diag([0.02, 0.02, 0.01])

def predict(self, delta_dist, delta_theta):
x, y, th = self.x.flatten()
th_new = th + delta_theta
x_new = x + delta_dist * np.cos(th_new)
y_new = y + delta_dist * np.sin(th_new)
self.x = np.array([[x_new],[y_new],[th_new]])
# simple covariance increase
self.P = self.P + self.Q

def update_pose(self, z, R):
"""
z: 3x1 measurement
R: 3x3 measurement covariance
"""
H = np.eye(3)
y = z - H.dot(self.x)
# normalize angle residual
y[2] = (y[2] + np.pi) % (2*np.pi) - np.pi
S = H.dot(self.P).dot(H.T) + R
K = self.P.dot(H.T).dot(np.linalg.inv(S))
self.x = self.x + K.dot(y)
self.P = (np.eye(3) - K.dot(H)).dot(self.P)

def get_state(self):
return self.x.flatten()




Visual Odometry (ORB)

---------------------------

class VisualOdometry:
def init(self, max_features=1000, focal=300.0):
self.orb = cv2.ORB_create(max_features)
self.bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
self.prev_kp = None
self.prev_des = None
self.prev_frame = None
self.f = focal

def process_frame(self, frame_gray):
if self.prev_frame is None:
self.prev_frame = frame_gray
self.prev_kp, self.prev_des = self.orb.detectAndCompute(frame_gray, None)
return 0.0, 0.0, 0.0
kp, des = self.orb.detectAndCompute(frame_gray, None)
if des is None or self.prev_des is None:
self.prev_frame = frame_gray
self.prev_kp, self.prev_des = kp, des
return 0.0, 0.0, 0.0
matches = self.bf.match(self.prev_des, des)
matches = sorted(matches, key=lambda x: x.distance)[:150]
if len(matches) < 8:
self.prev_frame = frame_gray
self.prev_kp, self.prev_des = kp, des
return 0.0, 0.0, 0.0
pts_prev = np.float32([ self.prev_kp[m.queryIdx].pt for m in matches ])
pts_cur = np.float32([ kp[m.trainIdx].pt for m in matches ])
E, mask = cv2.findEssentialMat(pts_cur, pts_prev, focal=self.f,
pp=(frame_gray.shape[1]/2, frame_gray.shape[0]/2),
method=cv2.RANSAC, prob=0.999, threshold=1.0)
if E is None:
self.prev_frame = frame_gray
self.prev_kp, self.prev_des = kp, des
return 0.0, 0.0, 0.0
_, R, t, mask = cv2.recoverPose(E, pts_cur, pts_prev, focal=self.f,
pp=(frame_gray.shape[1]/2, frame_gray.shape[0]/2))
# approximate 2D motion: forward ~ t_z, yaw ~ small yaw from R
if t is None:
dz = 0.0
else:
dz = float(t[2,0]) if t.shape==(3,1) else 0.0
yaw = math.atan2(R[1,0], R[0,0])
self.prev_frame = frame_gray
self.prev_kp, self.prev_des = kp, des
return dz, 0.0, yaw




Occupancy Grid SLAM (log-odds)

---------------------------

class OccupancyGrid:
def init(self, width_m=10.0, height_m=10.0, resolution=0.05):
self.resolution = resolution
self.width = int(width_m / resolution)
self.height = int(height_m / resolution)
self.log_odds = np.zeros((self.height, self.width), dtype=np.float32)
self.l_free = -0.4
self.l_occ = 0.85
self.origin_x = -width_m / 2.0
self.origin_y = -height_m / 2.0

def world_to_grid(self, x, y):
gx = int((x - self.origin_x) / self.resolution)
gy = int((y - self.origin_y) / self.resolution)
return gx, gy

def in_bounds(self, gx, gy):
return 0 <= gx < self.width and 0 <= gy < self.height

def bresenham(self, x0, y0, x1, y1):
points = []
dx = abs(x1 - x0)
sx = 1 if x0 < x1 else -1
dy = -abs(y1 - y0)
sy = 1 if y0 < y1 else -1
err = dx + dy
x, y = x0, y0
while True:
points.append((x,y))
if x == x1 and y == y1:
break
e2 = 2*err
if e2 >= dy:
err += dy
x += sx
if e2 <= dx:
err += dx
y += sy
return points

def integrate_scan(self, pose: Tuple[float,float,float], ranges: np.ndarray, angles: np.ndarray, max_range=5.0):
x, y, th = pose
for r, a in zip(ranges, angles):
beam_angle = th + a
ex = x + r * math.cos(beam_angle)
ey = y + r * math.sin(beam_angle)
gx0, gy0 = self.world_to_grid(x, y)
gx1, gy1 = self.world_to_grid(ex, ey)
points = self.bresenham(gx0, gy0, gx1, gy1)
# free along beam (except last)
for (gx, gy) in points[:-1]:
if self.in_bounds(gx, gy):
self.log_odds[gy, gx] += self.l_free
# last cell
gx, gy = points[-1]
if self.in_bounds(gx, gy) and r < (max_range - 0.01):
self.log_odds[gy, gx] += self.l_occ
self.log_odds = np.clip(self.log_odds, -8.0, 8.0)

def get_prob_map(self):
odds = np.exp(self.log_odds)
prob = odds / (1.0 + odds)
return prob




Planner (A*)

---------------------------

def heuristic(a, b):
return math.hypot(b[0]-a[0], b[1]-a[1])

def a_star(grid_prob: np.ndarray, start: Tuple[int,int], goal: Tuple[int,int], threshold=0.6) -> List[Tuple[int,int]]:
h, w = grid_prob.shape
def neighbors(node):
x, y = node
for dx, dy in [(-1,0),(1,0),(0,-1),(0,1),(-1,-1),(1,1),(-1,1),(1,-1)]:
nx, ny = x+dx, y+dy
if 0 <= nx < w and 0 <= ny < h and grid_prob[ny, nx] < threshold:
yield (nx, ny)
close_set = set()
came_from = {}
gscore = {start:0.0}
fscore = {start:heuristic(start, goal)}
oheap = []
heapq.heappush(oheap, (fscore[start], start))
while oheap:
_, current = heapq.heappop(oheap)
if current == goal:
path = [current]
while current in came_from:
current = came_from[current]
path.append(current)
path.reverse()
return path
close_set.add(current)
for neighbor in neighbors(current):
tentative_g = gscore[current] + heuristic(current, neighbor)
if neighbor in close_set and tentative_g >= gscore.get(neighbor, 1e9):
continue
if tentative_g < gscore.get(neighbor, 1e9) or neighbor not in [i[1] for i in oheap]:
came_from[neighbor] = current
gscore[neighbor] = tentative_g
fscore[neighbor] = tentative_g + heuristic(neighbor, goal)
heapq.heappush(oheap, (fscore[neighbor], neighbor))
return []

---------------------------

Controller (Pure Pursuit)

---------------------------

class PurePursuitController:
def init(self, lookahead_m=0.3, max_speed=0.2):
self.lookahead = lookahead_m
self.max_speed = max_speed

def compute_control(self, pose: Tuple[float,float,float], path_world: List[Tuple[float,float]], obstacle_dist: float):
x, y, th = pose
if len(path_world) == 0:
return 0.0, 0.0
distances = [math.hypot(px-x, py-y) for (px,py) in path_world]
idx = next((i for i,d in enumerate(distances) if d >= self.lookahead), len(distances)-1)
goal = path_world[idx]
dx = goal[0]-x
dy = goal[1]-y
# rotate into robot frame
x_r = math.cos(-th)dx - math.sin(-th)*dy
y_r = math.sin(-th)*dx + math.cos(-th)*dy
if abs(x_r) < 1e-6:
curvature = 0.0
else:
curvature = 2.0 * y_r / (self.lookahead
*2)
v = self.max_speed
if obstacle_dist < 0.4:
v = 0.0
elif obstacle_dist < 1.0:
v = self.max_speed * max(0.05, (obstacle_dist - 0.3) / 0.7)
omega = v * curvature
return v, omega




Dashboard (Flask)

---------------------------

app = Flask(name)
_state = {
'pose': (0.0, 0.0, 0.0),
'grid_prob': None,
'path': [],
'sensors': {}
}

@app.route('/')
def index():
return render_template_string("""

Robot Dashboard

SIH Robot — GPS-denied Dashboard



Pose




Sensors




<br> async function update(){<br> let p = await fetch(&#39;/state&#39;).then(r=&gt;r.json());<br> document.getElementById(&#39;pose&#39;).innerText = JSON.stringify(p.pose, null, 2);<br> document.getElementById(&#39;sensors&#39;).innerText = JSON.stringify(p.sensors, null, 2);<br> setTimeout(update, 500);<br> }<br> update();<br>


""")

@app.route('/state')
def state():
return jsonify(_state)

@app.route('/map.png')
def map_png():
grid = _state.get('grid_prob')
pose = _state.get('pose', (0,0,0))
path = _state.get('path', [])
plt.figure(figsize=(6.4,4.8))
if grid is None:
plt.imshow(np.ones((480,640)), origin='lower', cmap='gray')
else:
# grid is probability map in grid coordinates (height, width)
# Show with origin lower
plt.imshow(1.0 - grid, origin='lower', cmap='gray')
if len(path)>0:
xs = [p[0] for p in path]
ys = [p[1] for p in path]
plt.plot(xs, ys, '-r', linewidth=1.5)
x,y,th = pose
plt.scatter([x],[y], c='b')
plt.title('Occupancy Grid (white=free, black=occ)')
buf = io.BytesIO()
plt.savefig(buf, format='png')
plt.close()
buf.seek(0)
return Response(buf.read(), mimetype='image/png')

def dashboard_set_state(pose=None, grid_prob=None, path=None, sensors=None):
if pose is not None:
_state['pose'] = pose
if grid_prob is not None:
# dashboard expects world coords... we'll map path to world later
_state['grid_prob'] = grid_prob
if path is not None:
_state['path'] = path
if sensors is not None:
_state['sensors'] = sensors

---------------------------

Main robot orchestration

---------------------------

def run_dashboard_server():
# run Flask app in thread
app.run(host='0.0.0.0', port=5000, debug=False, use_reloader=False)

def main_loop(rate_hz=10):
dt = 1.0 / rate_hz
ekf = EKF()
vo = VisualOdometry()
grid = OccupancyGrid(width_m=8.0, height_m=8.0, resolution=0.05)
controller = PurePursuitController(lookahead_m=0.25, max_speed=0.18)
sensor_health = SensorHealth()
cam = CameraWrapper(0)

# Example goal 2 meters ahead in world coords
goal_world = (2.0, 0.0)

def world_to_grid_idx(x, y):
gx = int((x - grid.origin_x) / grid.resolution)
gy = int((y - grid.origin_y) / grid.resolution)
return (gx, gy)

def grid_idx_to_world(gx, gy):
x = grid.origin_x + gx * grid.resolution
y = grid.origin_y + gy * grid.resolution
return (x, y)

path_world = []

try:
while True:
start = time.time()
# --- sensor reads ---
wz, ax = read_imu()
sensor_health.update(imu=True)
d, dtheta = read_wheel_odometry()
sensor_health.update(odom=True)
ranges, angles = read_lidar_scan()
sensor_health.update(lidar=True)
frame = cam.read()
sensor_health.update(cam=True)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # --- visual odometry ---
    dx_vo, dy_vo, yaw_vo = vo.process_frame(gray)

    # --- EKF predict (use wheel odom and small IMU blend) ---
    ekf.predict(d, dtheta + wz*dt*0.5)

    # --- occasionally update EKF with VO pseudo-measurement ---
    xek, yek, thek = ekf.get_state()
    dx_world = dx_vo * math.cos(thek) - dy_vo * math.sin(thek)
    dy_world = dx_vo * math.sin(thek) + dy_vo * math.cos(thek)
    z_pose = np.array([[xek + dx_world],[yek + dy_world],[thek + yaw_vo]])
    R_vo = np.diag([0.1, 0.1, 0.05])
    ekf.update_pose(z_pose, R_vo)

    # --- mapping (LIDAR into grid) ---
    pose = ekf.get_state()
    grid.integrate_scan(pose, ranges, angles, max_range=5.0)
    grid_prob = grid.get_prob_map()

    # --- planning (A*) on grid ---
    goal_g = world_to_grid_idx(goal_world[0], goal_world[1])
    gx, gy = world_to_grid_idx(pose[0], pose[1])
    start_g = (gx, gy)
    planned = []
    if grid.in_bounds(goal_g[0], goal_g[1]) and grid.in_bounds(start_g[0], start_g[1]):
        planned = a_star(grid_prob, start_g, goal_g, threshold=0.58)
    path_world = [ grid_idx_to_world(px, py) for (px,py) in planned ]

    # --- safety: estimate nearest obstacle ahead ---
    center_sector = ranges[len(ranges)//2 - 3 : len(ranges)//2 + 4]
    obstacle_dist = float(np.median(center_sector))

    # --- control ---
    v_cmd, omega_cmd = controller.compute_control(pose, path_world, obstacle_dist)

    # --- fail-safes ---
    health = sensor_health.ok(timeout=1.5)
    if not (health['lidar'] and health['odom']):
        v_cmd, omega_cmd = 0.0, 0.0
    if max(np.diag(ekf.P)) &gt; 4.0:
        v_cmd, omega_cmd = 0.0, 0.0

    # --- actuator output (replace with real actuator code) ---
    # send_actuator_commands(v_cmd, omega_cmd)
    print(f"[{time.strftime('%H:%M:%S')}] pose={tuple(np.round(pose,3))} v={v_cmd:.2f} w={omega_cmd:.2f} obs={obstacle_dist:.2f}")

    # --- update dashboard state ---
    dashboard_set_state(pose=tuple(pose),
                        grid_prob=grid_prob,
                        path=path_world,
                        sensors={
                            'obstacle_dist': obstacle_dist,
                            'vo_dx': float(dx_vo),
                            'vo_yaw': float(yaw_vo),
                            'ekf_P_diag': [float(x) for x in ekf.P.diagonal()],
                            'health': health
                        })

    elapsed = time.time() - start
    sleep = max(0.0, dt - elapsed)
    time.sleep(sleep)
Enter fullscreen mode Exit fullscreen mode

except KeyboardInterrupt:
cam.release()
print("Shutting down main loop")




Entrypoint

---------------------------

if name == 'main':
# start dashboard thread
t = threading.Thread(target=run_dashboard_server, daemon=True)
t.start()
print("Dashboard running at http://0.0.0.0:5000")
main_loop(rate_hz=10)

Top comments (0)