AI 自动驾驶与端到端深度学习完全指南:Tesla FSD、Waymo、端到端方案实战
前言
自动驾驶正在从"模块化"走向"端到端"。Tesla FSD V12 证明了纯视觉端到端方案的可行性,Waymo 则在 Robotaxi 领域持续深耕。2026年,端到端自动驾驶已经成为行业共识。
自动驾驶技术概述
自动驾驶技术架构对比:
1. 模块化方案 (传统)
- 感知 → 规划 → 控制
- 优点:可解释性强、易调试
- 限制:误差累积、规则复杂
- 代表:大多数传统车企
2. 端到端方案 (Tesla FSD V12)
- 传感器输入 → 驾驶输出
- 优点:无误差累积、类人驾驶
- 限制:可解释性差、需大量数据
- 代表:Tesla、Wayve
- 端到端 + 规则兜底
- 优点:平衡安全与性能
- 代表:华为、小鹏
4. 多模态大模型方案
- VLM + 驾驶策略
- 优点:泛化能力强
- 代表:DriveGPT、UniAD
Tesla FSD 架构
FSD 核心实现
import torch
import torch.nn as nn
from typing import Dict, List, Tuple, Optional
import numpy as np
class OccupancyNetwork:
"""占用网络 (Occupancy Network)"""
def __init__(self, voxel_size: float = 0.1):
self.voxel_size = voxel_size
self.encoder = None
self.decoder = None
def build_encoder(self, input_channels: int = 12):
"""构建编码器"""
self.encoder = nn.Sequential(
nn.Conv3d(input_channels, 64, kernel_size=3, padding=1),
nn.BatchNorm3d(64),
nn.ReLU(inplace=True),
nn.MaxPool3d(2),
nn.Conv3d(64, 128, kernel_size=3, padding=1),
nn.BatchNorm3d(128),
nn.ReLU(inplace=True),
nn.MaxPool3d(2),
nn.Conv3d(128, 256, kernel_size=3, padding=1),
nn.BatchNorm3d(256),
nn.ReLU(inplace=True),
def voxelize(
points: np.ndarray,
grid_range: Tuple[float, float, float] = (-50, 50, -50, 50, -5, 5)
) -> torch.Tensor:
points: (N, 3) 点云
grid_range: (x_min, x_max, y_min, y_max, z_min, z_max)
x_min, x_max, y_min, y_max, z_min, z_max = grid_range
voxel_x = ((points[:, 0] - x_min) / self.voxel_size).astype(int)
voxel_y = ((points[:, 1] - y_min) / self.voxel_size).astype(int)
voxel_z = ((points[:, 2] - z_min) / self.voxel_size).astype(int)
(voxel_x >= 0) & (voxel_x < int((x_max - x_min) / self.voxel_size)) &
(voxel_y >= 0) & (voxel_y < int((y_max - y_min) / self.voxel_size)) &
(voxel_z >= 0) & (voxel_z < int((z_max - z_min) / self.voxel_size))
return points[valid], voxel_x[valid], voxel_y[valid], voxel_z[valid]
def forward(self, points: np.ndarray) -> Dict:
valid_points, vx, vy, vz = self.voxelize(points)
grid_size = (
int(100 / self.voxel_size),
int(100 / self.voxel_size),
int(10 / self.voxel_size)
voxel_grid = torch.zeros(grid_size, dtype=torch.float32)
# 填充体素 (简化为密度)
for i in range(len(valid_points)):
voxel_grid[vx[i], vy[i], vz[i]] += 1
voxel_tensor = voxel_grid.unsqueeze(0).unsqueeze(0) # (1, 1, D, H, W)
features = self.encoder(voxel_tensor)
"occupancy": features,
"points": valid_points
class BirdEyeView:
"""鸟瞰图 (BEV) 特征提取"""
def __init__(self, feature_dim: int = 256):
self.feature_dim = feature_dim
def build_bev(
multi_scale_features: List[torch.Tensor],
target_size: Tuple[int, int] = (200, 200)
) -> torch.Tensor:
multi_scale_features: 多尺度特征列表
target_size: (H, W)
fused = torch.zeros(
1, self.feature_dim, target_size[0], target_size[1]
for feat in multi_scale_features:
feat_up = torch.nn.functional.interpolate(
size=target_size,
mode='bilinear',
align_corners=False
fused += feat_up
return fused
def bev_to_image_coords(
bev_coords: np.ndarray,
origin: Tuple[float, float] = (0, 0),
resolution: float = 0.1
) -> np.ndarray:
"""BEV 坐标转图像坐标"""
x, y = bev_coords[:, 0], bev_coords[:, 1]
img_x = ((x - origin[0]) / resolution).astype(int)
img_y = ((y - origin[1]) / resolution).astype(int)
return np.stack([img_x, img_y], axis=-1)
class PlanningNetwork(nn.Module):
def __init__(self, input_dim: int = 512, hidden_dim: int = 256):
super().__init__()
self.planner = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(inplace=True),
nn.Dropout(0.1),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(inplace=True),
nn.Dropout(0.1),
nn.Linear(hidden_dim, 2), # (steering, throttle)
def forward(
bev_features: torch.Tensor,
route_features: torch.Tensor,
traffic_features: torch.Tensor
) -> Dict[str, torch.Tensor]:
bev_features: BEV 特征
route_features: 路由特征
traffic_features: 交通状态特征
{"trajectory": ..., "control": ...}
bev_flat = bev_features.flatten(1)
combined = torch.cat([
route_features,
traffic_features
trajectory = self.planner(combined)
control = torch.tanh(trajectory)
"trajectory": trajectory,
"control": control
class FSDStackedHourglass(nn.Module):
"""FSD 沙漏网络"""
def __init__(self, num_joints: int = 2):
super().__init__()
self.num_joints = num_joints
self.encoder = nn.Sequential(
nn.Conv2d(12, 64, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(2),
self._make_layer(64, 128),
self._make_layer(128, 128),
self._make_layer(128, 256),
self.hourglass = self._make_hourglass(256, num_joints)
def _make_layer(self, in_ch: int, out_ch: int) -> nn.Module:
return nn.Sequential(
nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1),
nn.BatchNorm2d(out_ch),
nn.ReLU(inplace=True),
nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1),
nn.BatchNorm2d(out_ch),
nn.ReLU(inplace=True),
def _make_hourglass(self, channels: int, num_joints: int) -> nn.Module:
"""构建沙漏模块"""
return nn.Sequential(
self._make_layer(channels, channels),
self._make_layer(channels, channels),
nn.MaxPool2d(2),
self._make_layer(channels, channels * 2),
self._make_layer(channels * 2, channels * 2),
nn.MaxPool2d(2),
self._make_layer(channels * 2, channels * 2),
nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),
self._make_layer(channels * 2, channels * 2),
nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),
nn.Conv2d(channels * 2, num_joints, kernel_size=1),
def forward(self, x: torch.Tensor) -> List[torch.Tensor]:
encoded = self.encoder(x)
outputs = self.hourglass(encoded)
return outputs
端到端自动驾驶
UniAD 架构
class UniAD(nn.Module):
"""UniAD 端到端自动驾驶"""
def __init__(self, config: Dict):
super().__init__()
self.config = config
self.backbone = self._build_backbone()
self.neck = self._build_neck()
self.bev_encoder = BEVEncoder(
in_channels=512,
out_channels=config.get("bev_channels", 256)
self.map_head = MapHead(config)
self.detection_head = DetectionHead(config)
self.motion_head = MotionHead(config)
self.planning_head = PlanningHead(config)
def _build_backbone(self) -> nn.Module:
"""构建骨干网络"""
return nn.Sequential(
nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(3, stride=2, padding=1),
# ResNet blocks
self._make_resblock(64, 64, num_blocks=3),
self._make_resblock(64, 128, num_blocks=4, stride=2),
self._make_resblock(128, 256, num_blocks=6, stride=2),
self._make_resblock(256, 512, num_blocks=3, stride=2),
def _make_resblock(
in_ch: int,
out_ch: int,
num_blocks: int,
stride: int = 1
) -> nn.Module:
"""ResNet block"""
layers = []
layers.append(nn.Conv2d(in_ch, out_ch, kernel_size=3, stride=stride, padding=1))
layers.append(nn.BatchNorm2d(out_ch))
layers.append(nn.ReLU(inplace=True))
for _ in range(num_blocks - 1):
layers.append(nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1))
layers.append(nn.BatchNorm2d(out_ch))
layers.append(nn.ReLU(inplace=True))
return nn.Sequential(*layers)
def _build_neck(self) -> nn.Module:
"""构建 Neck (FPN)"""
return nn.Sequential(
nn.Conv2d(512, 256, kernel_size=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
def forward(
images: torch.Tensor,
intrinsics: torch.Tensor,
extrinsics: torch.Tensor
) -> Dict[str, torch.Tensor]:
images: (B, N, 3, H, W) 多视角图像
intrinsics: (B, N, 3, 3) 内参
extrinsics: (B, N, 4, 4) 外参
"detection": {...},
"tracking": {...},
"map": {...},
"motion": {...},
"planning": {...}
B, N, C, H, W = images.shape
multi_view_features = []
for i in range(N):
img = images[:, i] # (B, 3, H, W)
feat = self.backbone(img) # (B, 512, H', W')
feat = self.neck(feat)
multi_view_features.append(feat)
# 视角融合 + BEV
bev_features = self.bev_encoder(
multi_view_features,
intrinsics,
detection = self.detection_head(bev_features)
tracking = self.detection_head(bev_features) # 简化
map_pred = self.map_head(bev_features)
motion = self.motion_head(
bev_features,
detection["boxes"],
map_pred["lanes"]
planning = self.planning_head(
bev_features,
motion["trajectories"]
"detection": detection,
"tracking": tracking,
"map": map_pred,
"motion": motion,
"planning": planning
class BEVEncoder(nn.Module):
"""BEV 编码器"""
def __init__(self, in_channels: int, out_channels: int):
super().__init__()
self.encoder = nn.Sequential(
nn.Conv2d(in_channels, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.Conv2d(128, out_channels, kernel_size=3, padding=1),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
def forward(
multi_view_features: List[torch.Tensor],
intrinsics: torch.Tensor,
extrinsics: torch.Tensor
) -> torch.Tensor:
multi_view_features: 多视角特征列表
intrinsics: 内参
extrinsics: 外参
# 实际使用 transformer 进行视角变换
fused = torch.stack(multi_view_features, dim=1).mean(dim=1) # (B, C, H, W)
bev = self.encoder(fused)
class MapHead(nn.Module):
"""地图感知头"""
def __init__(self, config: Dict):
super().__init__()
self.lane_head = LaneDetectionHead(config)
self.segmentation_head = MapSegmentationHead(config)
def forward(self, bev_features: torch.Tensor) -> Dict[str, torch.Tensor]:
lanes = self.lane_head(bev_features)
segmentation = self.segmentation_head(bev_features)
"lanes": lanes,
"segmentation": segmentation
class LaneDetectionHead(nn.Module):
"""车道线检测头"""
def __init__(self, config: Dict):
super().__init__()
self.head = nn.Sequential(
nn.Conv2d(256, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.Conv2d(128, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
self.confidence = nn.Conv2d(64, 1, kernel_size=1)
self.offset = nn.Conv2d(64, 2, kernel_size=1) # 偏移
def forward(self, x: torch.Tensor) -> Dict[str, torch.Tensor]:
feat = self.head(x)
confidence = torch.sigmoid(self.confidence(feat))
offset = self.offset(feat)
"confidence": confidence,
"offset": offset
class MotionHead(nn.Module):
"""运动预测头"""
def __init__(self, config: Dict):
super().__init__()
self.num_agents = config.get("num_agents", 64)
self.future_frames = config.get("future_frames", 40)
self.motion_encoder = nn.GRU(
input_size=256,
hidden_size=256,
num_layers=2,
batch_first=True
self.trajectory_head = nn.Linear(256, self.future_frames * 2)
def forward(
bev_features: torch.Tensor,
detection_boxes: torch.Tensor,
map_lanes: torch.Tensor
) -> Dict[str, torch.Tensor]:
detection_boxes: (B, N, 5) [x, y, w, h, angle]
map_lanes: 地图信息
B = bev_features.shape[0]
# 简化:用检测框特征
agent_features = detection_boxes.flatten(1) # (B, N*5)
encoded, _ = self.motion_encoder(agent_features)
trajectories = self.trajectory_head(encoded)
trajectories = trajectories.view(B, -1, self.future_frames, 2)
"trajectories": trajectories,
"probabilities": torch.softmax(torch.randn(B, self.num_agents), dim=-1)
class PlanningHead(nn.Module):
def __init__(self, config: Dict):
super().__init__()
self.planner = nn.Sequential(
nn.Linear(512, 256),
nn.ReLU(inplace=True),
nn.Linear(256, 128),
nn.ReLU(inplace=True),
nn.Linear(128, 2), # (x, y) 轨迹点
def forward(
bev_features: torch.Tensor,
motion_trajectories: torch.Tensor
) -> Dict[str, torch.Tensor]:
motion_trajectories: (B, N, T, 2) 预测轨迹
B = bev_features.shape[0]
bev_flat = bev_features.flatten(1)
motion_flat = motion_trajectories.flatten(1)
combined = torch.cat([bev_flat, motion_flat], dim=-1)
planning = self.planner(combined)
"trajectory": planning.view(B, -1, 2), # (B, T, 2)
"waypoints": planning.view(B, -1, 2)
数据处理
自动驾驶数据管道
import numpy as np
from typing import Dict, List, Tuple, Optional
import pickle
class DatasetLoader:
"""数据加载器"""
def __init__(
data_root: str,
split: str = "train"
self.data_root = data_root
self.split = split
self.scene_ids = self._load_scene_list()
def _load_scene_list(self) -> List[str]:
"""加载场景列表"""
# 简化:从文件系统读取
def load_frame(self, scene_id: str, frame_id: int) -> Dict:
"""加载单帧数据"""
camera_data = self._load_camera(scene_id, frame_id)
lidar_data = self._load_lidar(scene_id, frame_id)
canbus_data = self._load_canbus(scene_id, frame_id)
annotation = self._load_annotation(scene_id, frame_id)
"cameras": camera_data,
"lidar": lidar_data,
"canbus": canbus_data,
"annotation": annotation
def _load_camera(self, scene_id: str, frame_id: int) -> Dict[str, np.ndarray]:
"""加载相机数据"""
def _load_lidar(self, scene_id: str, frame_id: int) -> np.ndarray:
"""加载激光雷达数据"""
return np.zeros((10000, 4))
def _load_canbus(self, scene_id: str, frame_id: int) -> Dict:
"""加载车辆 CAN 总线数据"""
"speed": 0.0,
"steering": 0.0,
"acceleration": 0.0,
"heading": 0.0
def _load_annotation(self, scene_id: str, frame_id: int) -> Dict:
"boxes_3d": [], # 3D 检测框
"track_ids": [],
"traffic_lights": [],
"lanes": []
class DataAugmentation:
@staticmethod
def random_flip(
image: np.ndarray,
boxes: np.ndarray,
prob: float = 0.5
) -> Tuple[np.ndarray, np.ndarray]:
"""随机水平翻转"""
if np.random.random() < prob:
image = np.flip(image, axis=1).copy()
if len(boxes) > 0:
boxes[:, [0, 2]] = image.shape[1] - boxes[:, [2, 0]]
return image, boxes
@staticmethod
def random_scaling(
image: np.ndarray,
scale_range: Tuple[float, float] = (0.9, 1.1)
) -> np.ndarray:
scale = np.random.uniform(*scale_range)
from scipy.ndimage import zoom
h, w = image.shape[:2]
scaled_h = int(h * scale)
scaled_w = int(w * scale)
scaled = zoom(image, (scaled_h/h, scaled_w/w, 1))
# 裁剪或填充到原尺寸
result = np.zeros_like(image)
if scale >= 1:
start_h = (scaled_h - h) // 2
start_w = (scaled_w - w) // 2
result = scaled[start_h:start_h+h, start_w:start_w+w]
start_h = (h - scaled_h) // 2
start_w = (w - scaled_w) // 2
result[start_h:start_h+scaled_h, start_w:start_w+scaled_w] = scaled
return result
@staticmethod
def random_rotation(
image: np.ndarray,
angle_range: Tuple[float, float] = (-15, 15)
) -> np.ndarray:
from scipy.ndimage import rotate
angle = np.random.uniform(*angle_range)
rotated = rotate(image, angle, reshape=False)
return rotated
class BEVGenerator:
"""BEV 视角生成器"""
def __init__(
x_range: Tuple[float, float] = (-50, 50),
y_range: Tuple[float, float] = (-50, 50),
z_range: Tuple[float, float] = (-5, 5),
resolution: float = 0.1
self.x_range = x_range
self.y_range = y_range
self.z_range = z_range
self.resolution = resolution
self.grid_size = (
int((x_range[1] - x_range[0]) / resolution),
int((y_range[1] - y_range[0]) / resolution),
int((z_range[1] - z_range[0]) / resolution)
def points_to_bev(
points: np.ndarray,
intensity: np.ndarray = None
) -> np.ndarray:
points: (N, 3) [x, y, z]
intensity: (N,) 反射强度
(C, H, W) BEV 特征图
# 初始化网格 (使用反射强度作为通道)
height_channels = self.grid_size[2]
channels = height_channels + (1 if intensity is not None else 0)
bev = np.zeros((channels, self.grid_size[0], self.grid_size[1]))
x_idx = ((points[:, 0] - self.x_range[0]) / self.resolution).astype(int)
y_idx = ((points[:, 1] - self.y_range[0]) / self.resolution).astype(int)
z_idx = ((points[:, 2] - self.z_range[0]) / self.resolution).astype(int)
(x_idx >= 0) & (x_idx < self.grid_size[0]) &
(y_idx >= 0) & (y_idx < self.grid_size[1]) &
(z_idx >= 0) & (z_idx < self.grid_size[2])
x_idx = x_idx[valid]
y_idx = y_idx[valid]
z_idx = z_idx[valid]
for i in range(len(x_idx)):
if z_idx[i] >= 0 and z_idx[i] < self.grid_size[2]:
bev[z_idx[i], x_idx[i], y_idx[i]] = 1.0
if intensity is not None:
intensity_valid = intensity[valid]
bev[-1, x_idx, y_idx] = intensity_valid
仿真与测试
自动驾驶仿真
import numpy as np
from typing import Dict, List, Tuple
from dataclasses import dataclass
class VehicleState:
heading: float
speed: float
steering: float
class TrafficActor:
"""交通参与者"""
vehicle_id: str
heading: float
speed: float
vehicle_type: str # car, pedestrian, cyclist
class DrivingSimulator:
"""驾驶仿真器"""
def __init__(
dt: float = 0.1,
map_size: Tuple[float, float] = (200, 200)
self.dt = dt
self.map_size = map_size
self.ego_vehicle = None
self.traffic_actors: List[TrafficActor] = []
self.traffic_lights = []
def set_ego(self, state: VehicleState):
"""设置自车状态"""
self.ego_vehicle = state
def add_traffic(self, actor: TrafficActor):
"""添加交通参与者"""
self.traffic_actors.append(actor)
def step(self, control: Dict[str, float]) -> Dict:
control: {"steering": ..., "throttle": ..., "brake": ...}
{"observation": ..., "reward": ..., "done": ...}
self._update_ego(control)
self._update_traffic()
observation = self._get_observation()
reward = self._compute_reward()
done = self._check_done()
"observation": observation,
"reward": reward,
"done": done
def _update_ego(self, control: Dict[str, float]):
if self.ego_vehicle is None:
wheelbase = 2.7 # 轴距
steering = control.get("steering", 0)
throttle = control.get("throttle", 0)
brake = control.get("brake", 0)
speed = self.ego_vehicle.speed
acceleration = throttle * 2.0 - brake * 5.0
new_speed = speed + acceleration * self.dt
new_speed = max(0, new_speed) # 不倒车
heading = self.ego_vehicle.heading
new_heading = heading + new_speed / wheelbase * np.tan(steering) * self.dt
new_x = self.ego_vehicle.x + new_speed * np.cos(new_heading) * self.dt
new_y = self.ego_vehicle.y + new_speed * np.sin(new_heading) * self.dt
self.ego_vehicle = VehicleState(
heading=new_heading,
speed=new_speed,
steering=steering
def _update_traffic(self):
for actor in self.traffic_actors:
actor.x += actor.speed * np.cos(actor.heading) * self.dt
actor.y += actor.speed * np.sin(actor.heading) * self.dt
def _get_observation(self) -> Dict:
"ego_state": {
"x": self.ego_vehicle.x,
"y": self.ego_vehicle.y,
"heading": self.ego_vehicle.heading,
"speed": self.ego_vehicle.speed
"traffic": [
"heading": a.heading,
"speed": a.speed,
"type": a.vehicle_type
for a in self.traffic_actors
"lidar": self._simulate_lidar(),
"camera": self._simulate_camera()
def _simulate_lidar(self) -> np.ndarray:
"""模拟激光雷达"""
return np.zeros((10000, 3))
def _simulate_camera(self) -> np.ndarray:
return np.zeros((375, 1242, 3))
def _compute_reward(self) -> float:
reward = 0.0
if self.ego_vehicle.speed > 10:
reward += 0.1
for actor in self.traffic_actors:
dist = np.sqrt(
(actor.x - self.ego_vehicle.x) ** 2 +
(actor.y - self.ego_vehicle.y) ** 2
if dist < 2.0:
reward -= 100.0
return reward
def _check_done(self) -> bool:
"""检查是否结束"""
for actor in self.traffic_actors:
dist = np.sqrt(
(actor.x - self.ego_vehicle.x) ** 2 +
(actor.y - self.ego_vehicle.y) ** 2
if dist < 1.0:
return True
self.ego_vehicle.x < 0 or self.ego_vehicle.x > self.map_size[0] or
self.ego_vehicle.y < 0 or self.ego_vehicle.y > self.map_size[1]
return True
return False
class ScenarioRunner:
"""场景运行器"""
def __init__(self, simulator: DrivingSimulator):
self.simulator = simulator
def run_scenario(
scenario: Dict,
max_steps: int = 1000
scenario: 场景定义
policy_fn: 策略函数 (observation -> control)
max_steps: 最大步数
self.simulator.set_ego(VehicleState(
x=scenario["ego_start"][0],
y=scenario["ego_start"][1],
heading=scenario["ego_start"][2],
for actor_def in scenario.get("traffic", []):
self.simulator.add_traffic(TrafficActor(
vehicle_id=actor_def["id"],
x=actor_def["start"][0],
y=actor_def["start"][1],
heading=actor_def["start"][2],
speed=actor_def.get("speed", 0),
vehicle_type=actor_def.get("type", "car")
total_reward = 0.0
for step in range(max_steps):
obs = self.simulator._get_observation()
control = policy_fn(obs)
result = self.simulator.step(control)
total_reward += result["reward"]
if result["done"]:
"total_reward": total_reward,
"steps": steps,
"success": steps < max_steps
规划与控制
轨迹规划
import numpy as np
from typing import List, Tuple, Optional
class TrajectoryPlanner:
"""轨迹规划器"""
def __init__(self):
self.waypoints = []
def plan_to_goal(
start: Tuple[float, float, float],
goal: Tuple[float, float],
obstacles: List[Tuple[float, float, float, float]] = None
) -> np.ndarray:
start: (x, y, heading)
goal: (x, y)
obstacles: [(x, y, w, h), ...]
# 简化的 A* 规划
if obstacles is None:
obstacles = []
trajectories = self._generate_candidate_trajectories(start, goal)
best_trajectory = self._select_best_trajectory(
trajectories,
return best_trajectory
def _generate_candidate_trajectories(
start: Tuple[float, float, float],
goal: Tuple[float, float]
) -> List[np.ndarray]:
"""生成候选轨迹"""
trajectories = []
for curvature in np.linspace(-0.5, 0.5, 5):
trajectory = self._generate_spline_trajectory(
start, goal, curvature
trajectories.append(trajectory)
return trajectories
def _generate_spline_trajectory(
start: Tuple[float, float, float],
goal: Tuple[float, float],
curvature: float
) -> np.ndarray:
"""生成样条轨迹"""
x0, y0, theta0 = start
xg, yg = goal
num_points = 50
trajectory = []
for i in range(num_points):
t = i / (num_points - 1)
if abs(curvature) < 1e-6:
x = x0 + (xg - x0) * t
y = y0 + (yg - y0) * t
r = 1.0 / curvature
cx = x0 - r * np.sin(theta0)
cy = y0 + r * np.cos(theta0)
start_angle = theta0 + np.pi / 2
end_angle = np.arctan2(yg - cy, xg - cx)
current_angle = start_angle + (end_angle - start_angle) * t
x = cx + r * np.sin(current_angle)
y = cy - r * np.cos(current_angle)
trajectory.append([x, y])
return np.array(trajectory)
def _select_best_trajectory(
trajectories: List[np.ndarray],
obstacles: List[Tuple[float, float, float, float]]
) -> np.ndarray:
"""选择最优轨迹"""
best_cost = float('inf')
best_trajectory = trajectories[0] if trajectories else None
for traj in trajectories:
cost = self._compute_trajectory_cost(traj, obstacles)
if cost < best_cost:
best_cost = cost
best_trajectory = traj
return best_trajectory
def _compute_trajectory_cost(
trajectory: np.ndarray,
obstacles: List[Tuple[float, float, float, float]]
) -> float:
"""计算轨迹代价"""
path_length = np.sum(np.linalg.norm(
np.diff(trajectory, axis=0),
cost += path_length
for obs_x, obs_y, obs_w, obs_h in obstacles:
for point in trajectory:
dist = np.sqrt(
(point[0] - obs_x) ** 2 +
(point[1] - obs_y) ** 2
if dist < 3.0:
cost += 1000.0 * (3.0 - dist)
return cost
class PIDController:
"""PID 控制器"""
def __init__(
kp: float = 1.0,
ki: float = 0.0,
kd: float = 0.0
self.kp = kp
self.ki = ki
self.kd = kd
self.prev_error = 0.0
self.integral = 0.0
def compute(
setpoint: float,
measurement: float,
dt: float = 0.1
) -> float:
setpoint: 设定值
measurement: 测量值
error = setpoint - measurement
self.integral += error * dt
derivative = (error - self.prev_error) / dt
output = self.kp * error + self.ki * self.integral + self.kd * derivative
self.prev_error = error
return output
class LateralController:
"""横向控制器 (Pure Pursuit)"""
def __init__(self, lookahead_distance: float = 5.0):
self.lookahead_distance = lookahead_distance
self.wheelbase = 2.7
def compute_steering(
vehicle_state: Tuple[float, float, float],
trajectory: np.ndarray
) -> float:
vehicle_state: (x, y, heading)
trajectory: (N, 2) 轨迹点
vx, vy, heading = vehicle_state
lookahead_point = self._find_lookahead_point(
dx = lookahead_point[0] - vx
dy = lookahead_point[1] - vy
angle_to_point = np.arctan2(dy, dx)
alpha = angle_to_point - heading
steering = np.arctan(2 * self.wheelbase * np.sin(alpha) / self.lookahead_distance)
return np.clip(steering, -0.5, 0.5)
def _find_lookahead_point(
vehicle_pos: Tuple[float, float],
trajectory: np.ndarray
) -> np.ndarray:
"""找到前瞻点"""
vx, vy = vehicle_pos
# 找到轨迹上距离车一定距离的点
dists = np.linalg.norm(trajectory - np.array([vx, vy]), axis=1)
valid_indices = np.where(dists >= self.lookahead_distance * 0.8)[0]
if len(valid_indices) == 0:
return trajectory[-1]
return trajectory[valid_indices[0]]
class LongitudinalController:
"""纵向控制器"""
def __init__(self):
self.speed_pid = PIDController(kp=1.0, ki=0.1, kd=0.1)
self.desired_speed = 0.0
def compute_throttle_brake(
desired_speed: float,
current_speed: float,
dt: float = 0.1
) -> Tuple[float, float]:
(throttle, brake)
self.desired_speed = desired_speed
control = self.speed_pid.compute(
desired_speed,
current_speed,
if control > 0:
return (min(control, 1.0), 0.0)
return (0.0, min(-control, 1.0))
变现路径
自动驾驶服务变现
1. 自动驾驶仿真平台
- 产品:自动驾驶仿真 SaaS
- 内容:场景仿真、数据回放
- 收益:按仿真时长计费
- 产品:3D 点云标注平台
- 内容:目标检测、车道线标注
- 收益:按标注量计费
3. ADAS 算法模块
- 产品:车道保持、自动泊车算法
- 内容:视觉/LiDAR 算法
- 收益:技术授权
- 产品:自动驾驶课程
- 内容:算法、仿真、实车
- 收益:课程销售
- 产品:商用车队管理
- 内容:监控、调度、路径规划
- 收益:SaaS 订阅
6. Robotaxi 服务
- 产品:无人出租车服务
- 内容:城市运营
- 收益:按里程计费
总结
占用网络:3D 空间表示,Tesla 核心
BEV 视角:多视角统一到鸟瞰图
端到端 UniAD:感知→预测→规划一体化
规划控制:Pure Pursuit、Stanley、PID
仿真测试:场景仿真、安全测试
数据管道:点云、图像、多传感器融合
地图感知:车道线、边界、语义地图
运动预测:多智能体轨迹预测
安全验证:仿真回放、脱离报告
变现模式:仿真平台、数据标注、技术授权
本文是自动驾驶系列之一。
This article contains affiliate links. If you sign up through the links above, I may earn a commission at no additional cost to you.
Ready to Build Your AI Business?
Get started with Systeme.io for free — All-in-one platform for building your online business with AI tools.
Top comments (0)