DEV Community

ZNY
ZNY

Posted on

AI Tesla FSDWaymo

AI 自动驾驶与端到端深度学习完全指南:Tesla FSD、Waymo、端到端方案实战

前言

自动驾驶正在从"模块化"走向"端到端"。Tesla FSD V12 证明了纯视觉端到端方案的可行性,Waymo 则在 Robotaxi 领域持续深耕。2026年,端到端自动驾驶已经成为行业共识。

自动驾驶技术概述


自动驾驶技术架构对比:

1. 模块化方案 (传统)

- 感知 → 规划 → 控制

- 优点:可解释性强、易调试

- 限制:误差累积、规则复杂

- 代表:大多数传统车企

2. 端到端方案 (Tesla FSD V12)

- 传感器输入 → 驾驶输出

- 优点:无误差累积、类人驾驶

- 限制:可解释性差、需大量数据

- 代表:Tesla、Wayve

- 端到端 + 规则兜底

- 优点:平衡安全与性能

- 代表:华为、小鹏

4. 多模态大模型方案

- VLM + 驾驶策略

- 优点:泛化能力强

- 代表:DriveGPT、UniAD

Enter fullscreen mode Exit fullscreen mode

Tesla FSD 架构

FSD 核心实现


import torch

import torch.nn as nn

from typing import Dict, List, Tuple, Optional

import numpy as np

class OccupancyNetwork:

"""占用网络 (Occupancy Network)"""

def __init__(self, voxel_size: float = 0.1):

self.voxel_size = voxel_size

self.encoder = None

self.decoder = None

def build_encoder(self, input_channels: int = 12):

"""构建编码器"""

self.encoder = nn.Sequential(

nn.Conv3d(input_channels, 64, kernel_size=3, padding=1),

nn.BatchNorm3d(64),

nn.ReLU(inplace=True),

nn.MaxPool3d(2),

nn.Conv3d(64, 128, kernel_size=3, padding=1),

nn.BatchNorm3d(128),

nn.ReLU(inplace=True),

nn.MaxPool3d(2),

nn.Conv3d(128, 256, kernel_size=3, padding=1),

nn.BatchNorm3d(256),

nn.ReLU(inplace=True),

def voxelize(

points: np.ndarray,

grid_range: Tuple[float, float, float] = (-50, 50, -50, 50, -5, 5)

) -> torch.Tensor:

points: (N, 3) 点云

grid_range: (x_min, x_max, y_min, y_max, z_min, z_max)

x_min, x_max, y_min, y_max, z_min, z_max = grid_range

voxel_x = ((points[:, 0] - x_min) / self.voxel_size).astype(int)

voxel_y = ((points[:, 1] - y_min) / self.voxel_size).astype(int)

voxel_z = ((points[:, 2] - z_min) / self.voxel_size).astype(int)

(voxel_x >= 0) & (voxel_x < int((x_max - x_min) / self.voxel_size)) &

(voxel_y >= 0) & (voxel_y < int((y_max - y_min) / self.voxel_size)) &

(voxel_z >= 0) & (voxel_z < int((z_max - z_min) / self.voxel_size))

return points[valid], voxel_x[valid], voxel_y[valid], voxel_z[valid]

def forward(self, points: np.ndarray) -> Dict:

valid_points, vx, vy, vz = self.voxelize(points)

grid_size = (

int(100 / self.voxel_size),

int(100 / self.voxel_size),

int(10 / self.voxel_size)

voxel_grid = torch.zeros(grid_size, dtype=torch.float32)

# 填充体素 (简化为密度)

for i in range(len(valid_points)):

voxel_grid[vx[i], vy[i], vz[i]] += 1

voxel_tensor = voxel_grid.unsqueeze(0).unsqueeze(0)  # (1, 1, D, H, W)

features = self.encoder(voxel_tensor)

"occupancy": features,

"points": valid_points

class BirdEyeView:

"""鸟瞰图 (BEV) 特征提取"""

def __init__(self, feature_dim: int = 256):

self.feature_dim = feature_dim

def build_bev(

multi_scale_features: List[torch.Tensor],

target_size: Tuple[int, int] = (200, 200)

) -> torch.Tensor:

multi_scale_features: 多尺度特征列表

target_size: (H, W)

fused = torch.zeros(

1, self.feature_dim, target_size[0], target_size[1]

for feat in multi_scale_features:

feat_up = torch.nn.functional.interpolate(

size=target_size,

mode='bilinear',

align_corners=False

fused += feat_up

return fused

def bev_to_image_coords(

bev_coords: np.ndarray,

origin: Tuple[float, float] = (0, 0),

resolution: float = 0.1

) -> np.ndarray:

"""BEV 坐标转图像坐标"""

x, y = bev_coords[:, 0], bev_coords[:, 1]

img_x = ((x - origin[0]) / resolution).astype(int)

img_y = ((y - origin[1]) / resolution).astype(int)

return np.stack([img_x, img_y], axis=-1)

class PlanningNetwork(nn.Module):

def __init__(self, input_dim: int = 512, hidden_dim: int = 256):

super().__init__()

self.planner = nn.Sequential(

nn.Linear(input_dim, hidden_dim),

nn.ReLU(inplace=True),

nn.Dropout(0.1),

nn.Linear(hidden_dim, hidden_dim),

nn.ReLU(inplace=True),

nn.Dropout(0.1),

nn.Linear(hidden_dim, 2),  # (steering, throttle)

def forward(

bev_features: torch.Tensor,

route_features: torch.Tensor,

traffic_features: torch.Tensor

) -> Dict[str, torch.Tensor]:

bev_features: BEV 特征

route_features: 路由特征

traffic_features: 交通状态特征

{"trajectory": ..., "control": ...}

bev_flat = bev_features.flatten(1)

combined = torch.cat([

route_features,

traffic_features

trajectory = self.planner(combined)

control = torch.tanh(trajectory)

"trajectory": trajectory,

"control": control

class FSDStackedHourglass(nn.Module):

"""FSD 沙漏网络"""

def __init__(self, num_joints: int = 2):

super().__init__()

self.num_joints = num_joints

self.encoder = nn.Sequential(

nn.Conv2d(12, 64, kernel_size=7, stride=2, padding=3),

nn.BatchNorm2d(64),

nn.ReLU(inplace=True),

nn.MaxPool2d(2),

self._make_layer(64, 128),

self._make_layer(128, 128),

self._make_layer(128, 256),

self.hourglass = self._make_hourglass(256, num_joints)

def _make_layer(self, in_ch: int, out_ch: int) -> nn.Module:

return nn.Sequential(

nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1),

nn.BatchNorm2d(out_ch),

nn.ReLU(inplace=True),

nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1),

nn.BatchNorm2d(out_ch),

nn.ReLU(inplace=True),

def _make_hourglass(self, channels: int, num_joints: int) -> nn.Module:

"""构建沙漏模块"""

return nn.Sequential(

self._make_layer(channels, channels),

self._make_layer(channels, channels),

nn.MaxPool2d(2),

self._make_layer(channels, channels * 2),

self._make_layer(channels * 2, channels * 2),

nn.MaxPool2d(2),

self._make_layer(channels * 2, channels * 2),

nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),

self._make_layer(channels * 2, channels * 2),

nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),

nn.Conv2d(channels * 2, num_joints, kernel_size=1),

def forward(self, x: torch.Tensor) -> List[torch.Tensor]:

encoded = self.encoder(x)

outputs = self.hourglass(encoded)

return outputs

Enter fullscreen mode Exit fullscreen mode

端到端自动驾驶

UniAD 架构


class UniAD(nn.Module):

"""UniAD 端到端自动驾驶"""

def __init__(self, config: Dict):

super().__init__()

self.config = config

self.backbone = self._build_backbone()

self.neck = self._build_neck()

self.bev_encoder = BEVEncoder(

in_channels=512,

out_channels=config.get("bev_channels", 256)

self.map_head = MapHead(config)

self.detection_head = DetectionHead(config)

self.motion_head = MotionHead(config)

self.planning_head = PlanningHead(config)

def _build_backbone(self) -> nn.Module:

"""构建骨干网络"""

return nn.Sequential(

nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),

nn.BatchNorm2d(64),

nn.ReLU(inplace=True),

nn.MaxPool2d(3, stride=2, padding=1),

# ResNet blocks

self._make_resblock(64, 64, num_blocks=3),

self._make_resblock(64, 128, num_blocks=4, stride=2),

self._make_resblock(128, 256, num_blocks=6, stride=2),

self._make_resblock(256, 512, num_blocks=3, stride=2),

def _make_resblock(

in_ch: int,

out_ch: int,

num_blocks: int,

stride: int = 1

) -> nn.Module:

"""ResNet block"""

layers = []

layers.append(nn.Conv2d(in_ch, out_ch, kernel_size=3, stride=stride, padding=1))

layers.append(nn.BatchNorm2d(out_ch))

layers.append(nn.ReLU(inplace=True))

for _ in range(num_blocks - 1):

layers.append(nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1))

layers.append(nn.BatchNorm2d(out_ch))

layers.append(nn.ReLU(inplace=True))

return nn.Sequential(*layers)

def _build_neck(self) -> nn.Module:

"""构建 Neck (FPN)"""

return nn.Sequential(

nn.Conv2d(512, 256, kernel_size=1),

nn.BatchNorm2d(256),

nn.ReLU(inplace=True),

nn.Conv2d(256, 256, kernel_size=3, padding=1),

nn.BatchNorm2d(256),

nn.ReLU(inplace=True),

def forward(

images: torch.Tensor,

intrinsics: torch.Tensor,

extrinsics: torch.Tensor

) -> Dict[str, torch.Tensor]:

images: (B, N, 3, H, W) 多视角图像

intrinsics: (B, N, 3, 3) 内参

extrinsics: (B, N, 4, 4) 外参

"detection": {...},

"tracking": {...},

"map": {...},

"motion": {...},

"planning": {...}

B, N, C, H, W = images.shape

multi_view_features = []

for i in range(N):

img = images[:, i]  # (B, 3, H, W)

feat = self.backbone(img)  # (B, 512, H', W')

feat = self.neck(feat)

multi_view_features.append(feat)

# 视角融合 + BEV

bev_features = self.bev_encoder(

multi_view_features,

intrinsics,

detection = self.detection_head(bev_features)

tracking = self.detection_head(bev_features)  # 简化

map_pred = self.map_head(bev_features)

motion = self.motion_head(

bev_features,

detection["boxes"],

map_pred["lanes"]

planning = self.planning_head(

bev_features,

motion["trajectories"]

"detection": detection,

"tracking": tracking,

"map": map_pred,

"motion": motion,

"planning": planning

class BEVEncoder(nn.Module):

"""BEV 编码器"""

def __init__(self, in_channels: int, out_channels: int):

super().__init__()

self.encoder = nn.Sequential(

nn.Conv2d(in_channels, 128, kernel_size=3, padding=1),

nn.BatchNorm2d(128),

nn.ReLU(inplace=True),

nn.Conv2d(128, out_channels, kernel_size=3, padding=1),

nn.BatchNorm2d(out_channels),

nn.ReLU(inplace=True),

def forward(

multi_view_features: List[torch.Tensor],

intrinsics: torch.Tensor,

extrinsics: torch.Tensor

) -> torch.Tensor:

multi_view_features: 多视角特征列表

intrinsics: 内参

extrinsics: 外参

# 实际使用 transformer 进行视角变换

fused = torch.stack(multi_view_features, dim=1).mean(dim=1)  # (B, C, H, W)

bev = self.encoder(fused)

class MapHead(nn.Module):

"""地图感知头"""

def __init__(self, config: Dict):

super().__init__()

self.lane_head = LaneDetectionHead(config)

self.segmentation_head = MapSegmentationHead(config)

def forward(self, bev_features: torch.Tensor) -> Dict[str, torch.Tensor]:

lanes = self.lane_head(bev_features)

segmentation = self.segmentation_head(bev_features)

"lanes": lanes,

"segmentation": segmentation

class LaneDetectionHead(nn.Module):

"""车道线检测头"""

def __init__(self, config: Dict):

super().__init__()

self.head = nn.Sequential(

nn.Conv2d(256, 128, kernel_size=3, padding=1),

nn.BatchNorm2d(128),

nn.ReLU(inplace=True),

nn.Conv2d(128, 64, kernel_size=3, padding=1),

nn.BatchNorm2d(64),

nn.ReLU(inplace=True),

self.confidence = nn.Conv2d(64, 1, kernel_size=1)

self.offset = nn.Conv2d(64, 2, kernel_size=1)  # 偏移

def forward(self, x: torch.Tensor) -> Dict[str, torch.Tensor]:

feat = self.head(x)

confidence = torch.sigmoid(self.confidence(feat))

offset = self.offset(feat)

"confidence": confidence,

"offset": offset

class MotionHead(nn.Module):

"""运动预测头"""

def __init__(self, config: Dict):

super().__init__()

self.num_agents = config.get("num_agents", 64)

self.future_frames = config.get("future_frames", 40)

self.motion_encoder = nn.GRU(

input_size=256,

hidden_size=256,

num_layers=2,

batch_first=True

self.trajectory_head = nn.Linear(256, self.future_frames * 2)

def forward(

bev_features: torch.Tensor,

detection_boxes: torch.Tensor,

map_lanes: torch.Tensor

) -> Dict[str, torch.Tensor]:

detection_boxes: (B, N, 5) [x, y, w, h, angle]

map_lanes: 地图信息

B = bev_features.shape[0]

# 简化:用检测框特征

agent_features = detection_boxes.flatten(1)  # (B, N*5)

encoded, _ = self.motion_encoder(agent_features)

trajectories = self.trajectory_head(encoded)

trajectories = trajectories.view(B, -1, self.future_frames, 2)

"trajectories": trajectories,

"probabilities": torch.softmax(torch.randn(B, self.num_agents), dim=-1)

class PlanningHead(nn.Module):

def __init__(self, config: Dict):

super().__init__()

self.planner = nn.Sequential(

nn.Linear(512, 256),

nn.ReLU(inplace=True),

nn.Linear(256, 128),

nn.ReLU(inplace=True),

nn.Linear(128, 2),  # (x, y) 轨迹点

def forward(

bev_features: torch.Tensor,

motion_trajectories: torch.Tensor

) -> Dict[str, torch.Tensor]:

motion_trajectories: (B, N, T, 2) 预测轨迹

B = bev_features.shape[0]

bev_flat = bev_features.flatten(1)

motion_flat = motion_trajectories.flatten(1)

combined = torch.cat([bev_flat, motion_flat], dim=-1)

planning = self.planner(combined)

"trajectory": planning.view(B, -1, 2),  # (B, T, 2)

"waypoints": planning.view(B, -1, 2)

Enter fullscreen mode Exit fullscreen mode

数据处理

自动驾驶数据管道


import numpy as np

from typing import Dict, List, Tuple, Optional

import pickle

class DatasetLoader:

"""数据加载器"""

def __init__(

data_root: str,

split: str = "train"

self.data_root = data_root

self.split = split

self.scene_ids = self._load_scene_list()

def _load_scene_list(self) -> List[str]:

"""加载场景列表"""

# 简化:从文件系统读取

def load_frame(self, scene_id: str, frame_id: int) -> Dict:

"""加载单帧数据"""

camera_data = self._load_camera(scene_id, frame_id)

lidar_data = self._load_lidar(scene_id, frame_id)

canbus_data = self._load_canbus(scene_id, frame_id)

annotation = self._load_annotation(scene_id, frame_id)

"cameras": camera_data,

"lidar": lidar_data,

"canbus": canbus_data,

"annotation": annotation

def _load_camera(self, scene_id: str, frame_id: int) -> Dict[str, np.ndarray]:

"""加载相机数据"""

def _load_lidar(self, scene_id: str, frame_id: int) -> np.ndarray:

"""加载激光雷达数据"""

return np.zeros((10000, 4))

def _load_canbus(self, scene_id: str, frame_id: int) -> Dict:

"""加载车辆 CAN 总线数据"""

"speed": 0.0,

"steering": 0.0,

"acceleration": 0.0,

"heading": 0.0

def _load_annotation(self, scene_id: str, frame_id: int) -> Dict:

"boxes_3d": [],  # 3D 检测框

"track_ids": [],

"traffic_lights": [],

"lanes": []

class DataAugmentation:

@staticmethod

def random_flip(

image: np.ndarray,

boxes: np.ndarray,

prob: float = 0.5

) -> Tuple[np.ndarray, np.ndarray]:

"""随机水平翻转"""

if np.random.random() < prob:

image = np.flip(image, axis=1).copy()

if len(boxes) > 0:

boxes[:, [0, 2]] = image.shape[1] - boxes[:, [2, 0]]

return image, boxes

@staticmethod

def random_scaling(

image: np.ndarray,

scale_range: Tuple[float, float] = (0.9, 1.1)

) -> np.ndarray:

scale = np.random.uniform(*scale_range)

from scipy.ndimage import zoom

h, w = image.shape[:2]

scaled_h = int(h * scale)

scaled_w = int(w * scale)

scaled = zoom(image, (scaled_h/h, scaled_w/w, 1))

# 裁剪或填充到原尺寸

result = np.zeros_like(image)

if scale >= 1:

start_h = (scaled_h - h) // 2

start_w = (scaled_w - w) // 2

result = scaled[start_h:start_h+h, start_w:start_w+w]

start_h = (h - scaled_h) // 2

start_w = (w - scaled_w) // 2

result[start_h:start_h+scaled_h, start_w:start_w+scaled_w] = scaled

return result

@staticmethod

def random_rotation(

image: np.ndarray,

angle_range: Tuple[float, float] = (-15, 15)

) -> np.ndarray:

from scipy.ndimage import rotate

angle = np.random.uniform(*angle_range)

rotated = rotate(image, angle, reshape=False)

return rotated

class BEVGenerator:

"""BEV 视角生成器"""

def __init__(

x_range: Tuple[float, float] = (-50, 50),

y_range: Tuple[float, float] = (-50, 50),

z_range: Tuple[float, float] = (-5, 5),

resolution: float = 0.1

self.x_range = x_range

self.y_range = y_range

self.z_range = z_range

self.resolution = resolution

self.grid_size = (

int((x_range[1] - x_range[0]) / resolution),

int((y_range[1] - y_range[0]) / resolution),

int((z_range[1] - z_range[0]) / resolution)

def points_to_bev(

points: np.ndarray,

intensity: np.ndarray = None

) -> np.ndarray:

points: (N, 3) [x, y, z]

intensity: (N,) 反射强度

(C, H, W) BEV 特征图

# 初始化网格 (使用反射强度作为通道)

height_channels = self.grid_size[2]

channels = height_channels + (1 if intensity is not None else 0)

bev = np.zeros((channels, self.grid_size[0], self.grid_size[1]))

x_idx = ((points[:, 0] - self.x_range[0]) / self.resolution).astype(int)

y_idx = ((points[:, 1] - self.y_range[0]) / self.resolution).astype(int)

z_idx = ((points[:, 2] - self.z_range[0]) / self.resolution).astype(int)

(x_idx >= 0) & (x_idx < self.grid_size[0]) &

(y_idx >= 0) & (y_idx < self.grid_size[1]) &

(z_idx >= 0) & (z_idx < self.grid_size[2])

x_idx = x_idx[valid]

y_idx = y_idx[valid]

z_idx = z_idx[valid]

for i in range(len(x_idx)):

if z_idx[i] >= 0 and z_idx[i] < self.grid_size[2]:

bev[z_idx[i], x_idx[i], y_idx[i]] = 1.0

if intensity is not None:

intensity_valid = intensity[valid]

bev[-1, x_idx, y_idx] = intensity_valid

Enter fullscreen mode Exit fullscreen mode

仿真与测试

自动驾驶仿真


import numpy as np

from typing import Dict, List, Tuple

from dataclasses import dataclass

class VehicleState:

heading: float

speed: float

steering: float

class TrafficActor:

"""交通参与者"""

vehicle_id: str

heading: float

speed: float

vehicle_type: str  # car, pedestrian, cyclist

class DrivingSimulator:

"""驾驶仿真器"""

def __init__(

dt: float = 0.1,

map_size: Tuple[float, float] = (200, 200)

self.dt = dt

self.map_size = map_size

self.ego_vehicle = None

self.traffic_actors: List[TrafficActor] = []

self.traffic_lights = []

def set_ego(self, state: VehicleState):

"""设置自车状态"""

self.ego_vehicle = state

def add_traffic(self, actor: TrafficActor):

"""添加交通参与者"""

self.traffic_actors.append(actor)

def step(self, control: Dict[str, float]) -> Dict:

control: {"steering": ..., "throttle": ..., "brake": ...}

{"observation": ..., "reward": ..., "done": ...}

self._update_ego(control)

self._update_traffic()

observation = self._get_observation()

reward = self._compute_reward()

done = self._check_done()

"observation": observation,

"reward": reward,

"done": done

def _update_ego(self, control: Dict[str, float]):

if self.ego_vehicle is None:

wheelbase = 2.7  # 轴距

steering = control.get("steering", 0)

throttle = control.get("throttle", 0)

brake = control.get("brake", 0)

speed = self.ego_vehicle.speed

acceleration = throttle * 2.0 - brake * 5.0

new_speed = speed + acceleration * self.dt

new_speed = max(0, new_speed)  # 不倒车

heading = self.ego_vehicle.heading

new_heading = heading + new_speed / wheelbase * np.tan(steering) * self.dt

new_x = self.ego_vehicle.x + new_speed * np.cos(new_heading) * self.dt

new_y = self.ego_vehicle.y + new_speed * np.sin(new_heading) * self.dt

self.ego_vehicle = VehicleState(

heading=new_heading,

speed=new_speed,

steering=steering

def _update_traffic(self):

for actor in self.traffic_actors:

actor.x += actor.speed * np.cos(actor.heading) * self.dt

actor.y += actor.speed * np.sin(actor.heading) * self.dt

def _get_observation(self) -> Dict:

"ego_state": {

"x": self.ego_vehicle.x,

"y": self.ego_vehicle.y,

"heading": self.ego_vehicle.heading,

"speed": self.ego_vehicle.speed

"traffic": [

"heading": a.heading,

"speed": a.speed,

"type": a.vehicle_type

for a in self.traffic_actors

"lidar": self._simulate_lidar(),

"camera": self._simulate_camera()

def _simulate_lidar(self) -> np.ndarray:

"""模拟激光雷达"""

return np.zeros((10000, 3))

def _simulate_camera(self) -> np.ndarray:

return np.zeros((375, 1242, 3))

def _compute_reward(self) -> float:

reward = 0.0

if self.ego_vehicle.speed > 10:

reward += 0.1

for actor in self.traffic_actors:

dist = np.sqrt(

(actor.x - self.ego_vehicle.x) ** 2 +

(actor.y - self.ego_vehicle.y) ** 2

if dist < 2.0:

reward -= 100.0

return reward

def _check_done(self) -> bool:

"""检查是否结束"""

for actor in self.traffic_actors:

dist = np.sqrt(

(actor.x - self.ego_vehicle.x) ** 2 +

(actor.y - self.ego_vehicle.y) ** 2

if dist < 1.0:

return True

self.ego_vehicle.x < 0 or self.ego_vehicle.x > self.map_size[0] or

self.ego_vehicle.y < 0 or self.ego_vehicle.y > self.map_size[1]

return True

return False

class ScenarioRunner:

"""场景运行器"""

def __init__(self, simulator: DrivingSimulator):

self.simulator = simulator

def run_scenario(

scenario: Dict,

max_steps: int = 1000

scenario: 场景定义

policy_fn: 策略函数 (observation -> control)

max_steps: 最大步数

self.simulator.set_ego(VehicleState(

x=scenario["ego_start"][0],

y=scenario["ego_start"][1],

heading=scenario["ego_start"][2],

for actor_def in scenario.get("traffic", []):

self.simulator.add_traffic(TrafficActor(

vehicle_id=actor_def["id"],

x=actor_def["start"][0],

y=actor_def["start"][1],

heading=actor_def["start"][2],

speed=actor_def.get("speed", 0),

vehicle_type=actor_def.get("type", "car")

total_reward = 0.0

for step in range(max_steps):

obs = self.simulator._get_observation()

control = policy_fn(obs)

result = self.simulator.step(control)

total_reward += result["reward"]

if result["done"]:

"total_reward": total_reward,

"steps": steps,

"success": steps < max_steps

Enter fullscreen mode Exit fullscreen mode

规划与控制

轨迹规划


import numpy as np

from typing import List, Tuple, Optional

class TrajectoryPlanner:

"""轨迹规划器"""

def __init__(self):

self.waypoints = []

def plan_to_goal(

start: Tuple[float, float, float],

goal: Tuple[float, float],

obstacles: List[Tuple[float, float, float, float]] = None

) -> np.ndarray:

start: (x, y, heading)

goal: (x, y)

obstacles: [(x, y, w, h), ...]

# 简化的 A* 规划

if obstacles is None:

obstacles = []

trajectories = self._generate_candidate_trajectories(start, goal)

best_trajectory = self._select_best_trajectory(

trajectories,

return best_trajectory

def _generate_candidate_trajectories(

start: Tuple[float, float, float],

goal: Tuple[float, float]

) -> List[np.ndarray]:

"""生成候选轨迹"""

trajectories = []

for curvature in np.linspace(-0.5, 0.5, 5):

trajectory = self._generate_spline_trajectory(

start, goal, curvature

trajectories.append(trajectory)

return trajectories

def _generate_spline_trajectory(

start: Tuple[float, float, float],

goal: Tuple[float, float],

curvature: float

) -> np.ndarray:

"""生成样条轨迹"""

x0, y0, theta0 = start

xg, yg = goal

num_points = 50

trajectory = []

for i in range(num_points):

t = i / (num_points - 1)

if abs(curvature) < 1e-6:

x = x0 + (xg - x0) * t

y = y0 + (yg - y0) * t

r = 1.0 / curvature

cx = x0 - r * np.sin(theta0)

cy = y0 + r * np.cos(theta0)

start_angle = theta0 + np.pi / 2

end_angle = np.arctan2(yg - cy, xg - cx)

current_angle = start_angle + (end_angle - start_angle) * t

x = cx + r * np.sin(current_angle)

y = cy - r * np.cos(current_angle)

trajectory.append([x, y])

return np.array(trajectory)

def _select_best_trajectory(

trajectories: List[np.ndarray],

obstacles: List[Tuple[float, float, float, float]]

) -> np.ndarray:

"""选择最优轨迹"""

best_cost = float('inf')

best_trajectory = trajectories[0] if trajectories else None

for traj in trajectories:

cost = self._compute_trajectory_cost(traj, obstacles)

if cost < best_cost:

best_cost = cost

best_trajectory = traj

return best_trajectory

def _compute_trajectory_cost(

trajectory: np.ndarray,

obstacles: List[Tuple[float, float, float, float]]

) -> float:

"""计算轨迹代价"""

path_length = np.sum(np.linalg.norm(

np.diff(trajectory, axis=0),

cost += path_length

for obs_x, obs_y, obs_w, obs_h in obstacles:

for point in trajectory:

dist = np.sqrt(

(point[0] - obs_x) ** 2 +

(point[1] - obs_y) ** 2

if dist < 3.0:

cost += 1000.0 * (3.0 - dist)

return cost

class PIDController:

"""PID 控制器"""

def __init__(

kp: float = 1.0,

ki: float = 0.0,

kd: float = 0.0

self.kp = kp

self.ki = ki

self.kd = kd

self.prev_error = 0.0

self.integral = 0.0

def compute(

setpoint: float,

measurement: float,

dt: float = 0.1

) -> float:

setpoint: 设定值

measurement: 测量值

error = setpoint - measurement

self.integral += error * dt

derivative = (error - self.prev_error) / dt

output = self.kp * error + self.ki * self.integral + self.kd * derivative

self.prev_error = error

return output

class LateralController:

"""横向控制器 (Pure Pursuit)"""

def __init__(self, lookahead_distance: float = 5.0):

self.lookahead_distance = lookahead_distance

self.wheelbase = 2.7

def compute_steering(

vehicle_state: Tuple[float, float, float],

trajectory: np.ndarray

) -> float:

vehicle_state: (x, y, heading)

trajectory: (N, 2) 轨迹点

vx, vy, heading = vehicle_state

lookahead_point = self._find_lookahead_point(

dx = lookahead_point[0] - vx

dy = lookahead_point[1] - vy

angle_to_point = np.arctan2(dy, dx)

alpha = angle_to_point - heading

steering = np.arctan(2 * self.wheelbase * np.sin(alpha) / self.lookahead_distance)

return np.clip(steering, -0.5, 0.5)

def _find_lookahead_point(

vehicle_pos: Tuple[float, float],

trajectory: np.ndarray

) -> np.ndarray:

"""找到前瞻点"""

vx, vy = vehicle_pos

# 找到轨迹上距离车一定距离的点

dists = np.linalg.norm(trajectory - np.array([vx, vy]), axis=1)

valid_indices = np.where(dists >= self.lookahead_distance * 0.8)[0]

if len(valid_indices) == 0:

return trajectory[-1]

return trajectory[valid_indices[0]]

class LongitudinalController:

"""纵向控制器"""

def __init__(self):

self.speed_pid = PIDController(kp=1.0, ki=0.1, kd=0.1)

self.desired_speed = 0.0

def compute_throttle_brake(

desired_speed: float,

current_speed: float,

dt: float = 0.1

) -> Tuple[float, float]:

(throttle, brake)

self.desired_speed = desired_speed

control = self.speed_pid.compute(

desired_speed,

current_speed,

if control > 0:

return (min(control, 1.0), 0.0)

return (0.0, min(-control, 1.0))

Enter fullscreen mode Exit fullscreen mode

变现路径

自动驾驶服务变现


1. 自动驾驶仿真平台

- 产品:自动驾驶仿真 SaaS

- 内容:场景仿真、数据回放

- 收益:按仿真时长计费

- 产品:3D 点云标注平台

- 内容:目标检测、车道线标注

- 收益:按标注量计费

3. ADAS 算法模块

- 产品:车道保持、自动泊车算法

- 内容:视觉/LiDAR 算法

- 收益:技术授权

- 产品:自动驾驶课程

- 内容:算法、仿真、实车

- 收益:课程销售

- 产品:商用车队管理

- 内容:监控、调度、路径规划

- 收益:SaaS 订阅

6. Robotaxi 服务

- 产品:无人出租车服务

- 内容:城市运营

- 收益:按里程计费

Enter fullscreen mode Exit fullscreen mode

总结

  1. 占用网络:3D 空间表示,Tesla 核心

  2. BEV 视角:多视角统一到鸟瞰图

  3. 端到端 UniAD:感知→预测→规划一体化

  4. 规划控制:Pure Pursuit、Stanley、PID

  5. 仿真测试:场景仿真、安全测试

  6. 数据管道:点云、图像、多传感器融合

  7. 地图感知:车道线、边界、语义地图

  8. 运动预测:多智能体轨迹预测

  9. 安全验证:仿真回放、脱离报告

  10. 变现模式:仿真平台、数据标注、技术授权

本文是自动驾驶系列之一。


This article contains affiliate links. If you sign up through the links above, I may earn a commission at no additional cost to you.

Ready to Build Your AI Business?

Get started with Systeme.io for free — All-in-one platform for building your online business with AI tools.

Top comments (0)