python3 -m venv venv
source venv/bin/activate
pip install opencv-python-headless numpy flask matplotlib scipy pillow
!/usr/bin/env python3
"""
robot.py
Single-file GPS-denied robot demo:
- EKF (wheel odom + IMU, occasional VO updates)
- Visual Odometry (ORB)
- Occupancy Grid mapping (log-odds)
- A* planner on the occupancy grid
- Pure Pursuit controller + safety stop
- Simple Flask dashboard showing occupancy, pose, sensors Sensor read functions are stubbed/simulated — replace with real drivers on real hardware. """
import time
import threading
import math
import io
import base64
from typing import List, Tuple
import numpy as np
import cv2
from flask import Flask, Response, render_template_string, jsonify
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from PIL import Image
import heapq
---------------------------
Sensor stubs / wrappers
---------------------------
class SensorHealth:
def init(self):
t = time.time()
self.last_imu = t
self.last_odom = t
self.last_cam = t
self.last_lidar = t
def update(self, imu=False, odom=False, cam=False, lidar=False):
now = time.time()
if imu: self.last_imu = now
if odom: self.last_odom = now
if cam: self.last_cam = now
if lidar: self.last_lidar = now
def ok(self, timeout=1.0):
now = time.time()
return {
'imu': (now - self.last_imu) < timeout,
'odom': (now - self.last_odom) < timeout,
'cam': (now - self.last_cam) < timeout,
'lidar': (now - self.last_lidar) < timeout
}
Replace these with real sensor calls on hardware
def read_imu():
"""
Return (wz, ax) = (angular velocity z in rad/s, linear accel x in m/s^2)
Replace with real IMU reading (e.g., MPU driver).
"""
wz = np.random.normal(0.0, 0.02)
ax = np.random.normal(0.0, 0.1)
return float(wz), float(ax)
def read_wheel_odometry():
"""
Return (delta_distance_m, delta_heading_rad) since last call.
Replace with encoder integration.
"""
# Simulated small forward motion per loop (for demo)
d = 0.02 + np.random.normal(0.0, 0.004)
dtheta = np.random.normal(0.0, 0.004)
return float(d), float(dtheta)
def read_lidar_scan(num_beams=180, max_range=5.0):
"""
Return (ranges, angles) arrays.
Replace with real lidar driver output.
This stub simulates a front obstacle.
"""
angles = np.linspace(-np.pi/2, np.pi/2, num_beams)
ranges = max_range * np.ones_like(angles)
center_idx = num_beams // 2
ranges[center_idx-3:center_idx+4] = 1.5 + np.random.normal(0.0, 0.03, 7)
ranges += np.random.normal(0.0, 0.02, size=angles.shape)
ranges = np.clip(ranges, 0.02, max_range)
return ranges.astype(float), angles.astype(float)
class CameraWrapper:
def init(self, source=0, width=320, height=240):
self.cap = cv2.VideoCapture(source)
# attempt to set size (may fail on some cameras)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.last_frame = None
def read(self):
ret, frame = self.cap.read()
if not ret or frame is None:
# return blank frame to avoid breaking VO
frame = np.zeros((240,320,3), dtype=np.uint8)
self.last_frame = frame
return frame
def release(self):
self.cap.release()
EKF (3-state)
---------------------------
class EKF:
def init(self):
self.x = np.zeros((3,1)) # x, y, theta
self.P = np.diag([0.1, 0.1, 0.05])
# small process noise
self.Q = np.diag([0.02, 0.02, 0.01])
def predict(self, delta_dist, delta_theta):
x, y, th = self.x.flatten()
th_new = th + delta_theta
x_new = x + delta_dist * np.cos(th_new)
y_new = y + delta_dist * np.sin(th_new)
self.x = np.array([[x_new],[y_new],[th_new]])
# simple covariance increase
self.P = self.P + self.Q
def update_pose(self, z, R):
"""
z: 3x1 measurement
R: 3x3 measurement covariance
"""
H = np.eye(3)
y = z - H.dot(self.x)
# normalize angle residual
y[2] = (y[2] + np.pi) % (2*np.pi) - np.pi
S = H.dot(self.P).dot(H.T) + R
K = self.P.dot(H.T).dot(np.linalg.inv(S))
self.x = self.x + K.dot(y)
self.P = (np.eye(3) - K.dot(H)).dot(self.P)
def get_state(self):
return self.x.flatten()
Visual Odometry (ORB)
---------------------------
class VisualOdometry:
def init(self, max_features=1000, focal=300.0):
self.orb = cv2.ORB_create(max_features)
self.bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
self.prev_kp = None
self.prev_des = None
self.prev_frame = None
self.f = focal
def process_frame(self, frame_gray):
if self.prev_frame is None:
self.prev_frame = frame_gray
self.prev_kp, self.prev_des = self.orb.detectAndCompute(frame_gray, None)
return 0.0, 0.0, 0.0
kp, des = self.orb.detectAndCompute(frame_gray, None)
if des is None or self.prev_des is None:
self.prev_frame = frame_gray
self.prev_kp, self.prev_des = kp, des
return 0.0, 0.0, 0.0
matches = self.bf.match(self.prev_des, des)
matches = sorted(matches, key=lambda x: x.distance)[:150]
if len(matches) < 8:
self.prev_frame = frame_gray
self.prev_kp, self.prev_des = kp, des
return 0.0, 0.0, 0.0
pts_prev = np.float32([ self.prev_kp[m.queryIdx].pt for m in matches ])
pts_cur = np.float32([ kp[m.trainIdx].pt for m in matches ])
E, mask = cv2.findEssentialMat(pts_cur, pts_prev, focal=self.f,
pp=(frame_gray.shape[1]/2, frame_gray.shape[0]/2),
method=cv2.RANSAC, prob=0.999, threshold=1.0)
if E is None:
self.prev_frame = frame_gray
self.prev_kp, self.prev_des = kp, des
return 0.0, 0.0, 0.0
_, R, t, mask = cv2.recoverPose(E, pts_cur, pts_prev, focal=self.f,
pp=(frame_gray.shape[1]/2, frame_gray.shape[0]/2))
# approximate 2D motion: forward ~ t_z, yaw ~ small yaw from R
if t is None:
dz = 0.0
else:
dz = float(t[2,0]) if t.shape==(3,1) else 0.0
yaw = math.atan2(R[1,0], R[0,0])
self.prev_frame = frame_gray
self.prev_kp, self.prev_des = kp, des
return dz, 0.0, yaw
Occupancy Grid SLAM (log-odds)
---------------------------
class OccupancyGrid:
def init(self, width_m=10.0, height_m=10.0, resolution=0.05):
self.resolution = resolution
self.width = int(width_m / resolution)
self.height = int(height_m / resolution)
self.log_odds = np.zeros((self.height, self.width), dtype=np.float32)
self.l_free = -0.4
self.l_occ = 0.85
self.origin_x = -width_m / 2.0
self.origin_y = -height_m / 2.0
def world_to_grid(self, x, y):
gx = int((x - self.origin_x) / self.resolution)
gy = int((y - self.origin_y) / self.resolution)
return gx, gy
def in_bounds(self, gx, gy):
return 0 <= gx < self.width and 0 <= gy < self.height
def bresenham(self, x0, y0, x1, y1):
points = []
dx = abs(x1 - x0)
sx = 1 if x0 < x1 else -1
dy = -abs(y1 - y0)
sy = 1 if y0 < y1 else -1
err = dx + dy
x, y = x0, y0
while True:
points.append((x,y))
if x == x1 and y == y1:
break
e2 = 2*err
if e2 >= dy:
err += dy
x += sx
if e2 <= dx:
err += dx
y += sy
return points
def integrate_scan(self, pose: Tuple[float,float,float], ranges: np.ndarray, angles: np.ndarray, max_range=5.0):
x, y, th = pose
for r, a in zip(ranges, angles):
beam_angle = th + a
ex = x + r * math.cos(beam_angle)
ey = y + r * math.sin(beam_angle)
gx0, gy0 = self.world_to_grid(x, y)
gx1, gy1 = self.world_to_grid(ex, ey)
points = self.bresenham(gx0, gy0, gx1, gy1)
# free along beam (except last)
for (gx, gy) in points[:-1]:
if self.in_bounds(gx, gy):
self.log_odds[gy, gx] += self.l_free
# last cell
gx, gy = points[-1]
if self.in_bounds(gx, gy) and r < (max_range - 0.01):
self.log_odds[gy, gx] += self.l_occ
self.log_odds = np.clip(self.log_odds, -8.0, 8.0)
def get_prob_map(self):
odds = np.exp(self.log_odds)
prob = odds / (1.0 + odds)
return prob
Planner (A*)
---------------------------
def heuristic(a, b):
return math.hypot(b[0]-a[0], b[1]-a[1])
def a_star(grid_prob: np.ndarray, start: Tuple[int,int], goal: Tuple[int,int], threshold=0.6) -> List[Tuple[int,int]]:
h, w = grid_prob.shape
def neighbors(node):
x, y = node
for dx, dy in [(-1,0),(1,0),(0,-1),(0,1),(-1,-1),(1,1),(-1,1),(1,-1)]:
nx, ny = x+dx, y+dy
if 0 <= nx < w and 0 <= ny < h and grid_prob[ny, nx] < threshold:
yield (nx, ny)
close_set = set()
came_from = {}
gscore = {start:0.0}
fscore = {start:heuristic(start, goal)}
oheap = []
heapq.heappush(oheap, (fscore[start], start))
while oheap:
_, current = heapq.heappop(oheap)
if current == goal:
path = [current]
while current in came_from:
current = came_from[current]
path.append(current)
path.reverse()
return path
close_set.add(current)
for neighbor in neighbors(current):
tentative_g = gscore[current] + heuristic(current, neighbor)
if neighbor in close_set and tentative_g >= gscore.get(neighbor, 1e9):
continue
if tentative_g < gscore.get(neighbor, 1e9) or neighbor not in [i[1] for i in oheap]:
came_from[neighbor] = current
gscore[neighbor] = tentative_g
fscore[neighbor] = tentative_g + heuristic(neighbor, goal)
heapq.heappush(oheap, (fscore[neighbor], neighbor))
return []
---------------------------
Controller (Pure Pursuit)
---------------------------
class PurePursuitController:
def init(self, lookahead_m=0.3, max_speed=0.2):
self.lookahead = lookahead_m
self.max_speed = max_speed
def compute_control(self, pose: Tuple[float,float,float], path_world: List[Tuple[float,float]], obstacle_dist: float):
x, y, th = pose
if len(path_world) == 0:
return 0.0, 0.0
distances = [math.hypot(px-x, py-y) for (px,py) in path_world]
idx = next((i for i,d in enumerate(distances) if d >= self.lookahead), len(distances)-1)
goal = path_world[idx]
dx = goal[0]-x
dy = goal[1]-y
# rotate into robot frame
x_r = math.cos(-th)dx - math.sin(-th)*dy
y_r = math.sin(-th)*dx + math.cos(-th)*dy
if abs(x_r) < 1e-6:
curvature = 0.0
else:
curvature = 2.0 * y_r / (self.lookahead*2)
v = self.max_speed
if obstacle_dist < 0.4:
v = 0.0
elif obstacle_dist < 1.0:
v = self.max_speed * max(0.05, (obstacle_dist - 0.3) / 0.7)
omega = v * curvature
return v, omega
Dashboard (Flask)
---------------------------
app = Flask(name)
_state = {
'pose': (0.0, 0.0, 0.0),
'grid_prob': None,
'path': [],
'sensors': {}
}
@app.route('/')
def index():
return render_template_string("""
SIH Robot — GPS-denied Dashboard

Pose
Sensors
<br> async function update(){<br> let p = await fetch('/state').then(r=>r.json());<br> document.getElementById('pose').innerText = JSON.stringify(p.pose, null, 2);<br> document.getElementById('sensors').innerText = JSON.stringify(p.sensors, null, 2);<br> setTimeout(update, 500);<br> }<br> update();<br>
""")
@app.route('/state')
def state():
return jsonify(_state)
@app.route('/map.png')
def map_png():
grid = _state.get('grid_prob')
pose = _state.get('pose', (0,0,0))
path = _state.get('path', [])
plt.figure(figsize=(6.4,4.8))
if grid is None:
plt.imshow(np.ones((480,640)), origin='lower', cmap='gray')
else:
# grid is probability map in grid coordinates (height, width)
# Show with origin lower
plt.imshow(1.0 - grid, origin='lower', cmap='gray')
if len(path)>0:
xs = [p[0] for p in path]
ys = [p[1] for p in path]
plt.plot(xs, ys, '-r', linewidth=1.5)
x,y,th = pose
plt.scatter([x],[y], c='b')
plt.title('Occupancy Grid (white=free, black=occ)')
buf = io.BytesIO()
plt.savefig(buf, format='png')
plt.close()
buf.seek(0)
return Response(buf.read(), mimetype='image/png')
def dashboard_set_state(pose=None, grid_prob=None, path=None, sensors=None):
if pose is not None:
_state['pose'] = pose
if grid_prob is not None:
# dashboard expects world coords... we'll map path to world later
_state['grid_prob'] = grid_prob
if path is not None:
_state['path'] = path
if sensors is not None:
_state['sensors'] = sensors
---------------------------
Main robot orchestration
---------------------------
def run_dashboard_server():
# run Flask app in thread
app.run(host='0.0.0.0', port=5000, debug=False, use_reloader=False)
def main_loop(rate_hz=10):
dt = 1.0 / rate_hz
ekf = EKF()
vo = VisualOdometry()
grid = OccupancyGrid(width_m=8.0, height_m=8.0, resolution=0.05)
controller = PurePursuitController(lookahead_m=0.25, max_speed=0.18)
sensor_health = SensorHealth()
cam = CameraWrapper(0)
# Example goal 2 meters ahead in world coords
goal_world = (2.0, 0.0)
def world_to_grid_idx(x, y):
gx = int((x - grid.origin_x) / grid.resolution)
gy = int((y - grid.origin_y) / grid.resolution)
return (gx, gy)
def grid_idx_to_world(gx, gy):
x = grid.origin_x + gx * grid.resolution
y = grid.origin_y + gy * grid.resolution
return (x, y)
path_world = []
try:
while True:
start = time.time()
# --- sensor reads ---
wz, ax = read_imu()
sensor_health.update(imu=True)
d, dtheta = read_wheel_odometry()
sensor_health.update(odom=True)
ranges, angles = read_lidar_scan()
sensor_health.update(lidar=True)
frame = cam.read()
sensor_health.update(cam=True)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# --- visual odometry ---
dx_vo, dy_vo, yaw_vo = vo.process_frame(gray)
# --- EKF predict (use wheel odom and small IMU blend) ---
ekf.predict(d, dtheta + wz*dt*0.5)
# --- occasionally update EKF with VO pseudo-measurement ---
xek, yek, thek = ekf.get_state()
dx_world = dx_vo * math.cos(thek) - dy_vo * math.sin(thek)
dy_world = dx_vo * math.sin(thek) + dy_vo * math.cos(thek)
z_pose = np.array([[xek + dx_world],[yek + dy_world],[thek + yaw_vo]])
R_vo = np.diag([0.1, 0.1, 0.05])
ekf.update_pose(z_pose, R_vo)
# --- mapping (LIDAR into grid) ---
pose = ekf.get_state()
grid.integrate_scan(pose, ranges, angles, max_range=5.0)
grid_prob = grid.get_prob_map()
# --- planning (A*) on grid ---
goal_g = world_to_grid_idx(goal_world[0], goal_world[1])
gx, gy = world_to_grid_idx(pose[0], pose[1])
start_g = (gx, gy)
planned = []
if grid.in_bounds(goal_g[0], goal_g[1]) and grid.in_bounds(start_g[0], start_g[1]):
planned = a_star(grid_prob, start_g, goal_g, threshold=0.58)
path_world = [ grid_idx_to_world(px, py) for (px,py) in planned ]
# --- safety: estimate nearest obstacle ahead ---
center_sector = ranges[len(ranges)//2 - 3 : len(ranges)//2 + 4]
obstacle_dist = float(np.median(center_sector))
# --- control ---
v_cmd, omega_cmd = controller.compute_control(pose, path_world, obstacle_dist)
# --- fail-safes ---
health = sensor_health.ok(timeout=1.5)
if not (health['lidar'] and health['odom']):
v_cmd, omega_cmd = 0.0, 0.0
if max(np.diag(ekf.P)) > 4.0:
v_cmd, omega_cmd = 0.0, 0.0
# --- actuator output (replace with real actuator code) ---
# send_actuator_commands(v_cmd, omega_cmd)
print(f"[{time.strftime('%H:%M:%S')}] pose={tuple(np.round(pose,3))} v={v_cmd:.2f} w={omega_cmd:.2f} obs={obstacle_dist:.2f}")
# --- update dashboard state ---
dashboard_set_state(pose=tuple(pose),
grid_prob=grid_prob,
path=path_world,
sensors={
'obstacle_dist': obstacle_dist,
'vo_dx': float(dx_vo),
'vo_yaw': float(yaw_vo),
'ekf_P_diag': [float(x) for x in ekf.P.diagonal()],
'health': health
})
elapsed = time.time() - start
sleep = max(0.0, dt - elapsed)
time.sleep(sleep)
except KeyboardInterrupt:
cam.release()
print("Shutting down main loop")
Entrypoint
---------------------------
if name == 'main':
# start dashboard thread
t = threading.Thread(target=run_dashboard_server, daemon=True)
t.start()
print("Dashboard running at http://0.0.0.0:5000")
main_loop(rate_hz=10)
Top comments (0)