Human-Aligned Decision Transformers for circular manufacturing supply chains with embodied agent feedback loops
Introduction: A Learning Journey from Linear to Circular Intelligence
My journey into this fascinating intersection of AI and sustainable manufacturing began not with a grand theory, but with a frustrating debugging session. I was working on a reinforcement learning agent designed to optimize a traditional linear supply chain—maximize throughput, minimize cost. The agent, a sophisticated PPO model, was performing brilliantly on the metrics we had defined. Yet, during a demo for a client in the electronics manufacturing sector, a simple question derailed everything: "But what happens to all this plastic housing when the product reaches end-of-life? Your model just ships it to landfill."
In that moment, I realized the profound limitation of our approach. We were optimizing a broken, linear system. The metrics were wrong. The objective was misaligned. This sparked a months-long research and experimentation phase where I dove into circular economy principles, studied the emerging field of offline reinforcement learning, and began prototyping agents that could reason about entire material lifecycles. Through studying papers on Decision Transformers and human-in-the-loop RL, I learned that the key wasn't just better optimization, but a fundamental re-alignment of the AI's objective function with multi-stakeholder, long-term human values. This article is a synthesis of that exploration—a technical blueprint for building AI systems that don't just make supply chains efficient, but make them circular, regenerative, and human-aligned.
Technical Background: The Pillars of Circular AI
Before diving into the implementation, it's crucial to understand the three converging domains that form the foundation of this approach.
1. Decision Transformers (DTs): Unlike traditional RL algorithms that learn a policy through trial-and-error reward maximization, Decision Transformers frame sequential decision-making as a conditional sequence modeling problem. As I was experimenting with offline RL datasets, I found that DTs, built on transformer architectures, excel at leveraging historical trajectory data. They take a desired return (reward-to-go), past states, and actions, and autoregressively predict future actions. This "return-conditioned" behavior is perfect for circular systems where we want to specify how circular we aim to be (e.g., "achieve 95% material recovery") and have the agent figure out the sequence of actions to get there.
2. Human-Alignment & Inverse Reward Design: A core insight from my research into AI safety is that we rarely can specify a perfect reward function. In circular manufacturing, the "reward" is a complex, often contradictory blend of economic cost, carbon footprint, social impact, and material purity. Human-alignment techniques, inspired by work like Cooperative Inverse Reinforcement Learning (CIRL), don't just learn to maximize a proxy reward. They learn a model of human preferences and uncertainties. During my investigation, I implemented a Bayesian approach where the agent maintains a posterior distribution over possible true reward functions based on sparse human feedback.
3. Embodied Agent Feedback Loops: This is where the digital meets the physical. A supply chain isn't just software—it involves robots, sorting arms, sensors, and logistics vehicles. An embodied agent is one that perceives and acts within a physical environment. The feedback loop is critical: actions (e.g., "disassemble product X") change the physical state, which is measured (e.g., "component Y is damaged"), and this new state informs the next decision. My exploration of robotic simulation platforms like NVIDIA Isaac Sim revealed the challenge of sim-to-real transfer for delicate disassembly tasks, which directly informed the architecture's design.
Architecture: The Core System Design
The proposed system is a multi-agent hierarchy where a central Orchestrator DT makes high-level strategic decisions (e.g., route this batch of returned smartphones to refurbishment vs. recycling), and multiple Embodied Worker Agents execute physical tasks, providing continuous feedback.
┌─────────────────────────────────────────────────────────────┐
│ Human Preference Interface │
│ (Sparse feedback, constraint specification, value tuning) │
└───────────────────────┬─────────────────────────────────────┘
│
┌───────────────────────▼─────────────────────────────────────┐
│ Orchestrator Decision Transformer │
│ - Models full supply chain graph state │
│ - Return-conditioned on circularity KPIs │
│ - Outputs high-level work orders & routes │
└───────────────────────┬─────────────────────────────────────┘
│
┌──────────────┼──────────────┬──────────────┐
│ │ │ │
┌────────▼──┐ ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ Disassembly │ │ Sorting │ │ Refurbish │ │ Logistics │
│ Agent │ │ Agent │ │ Agent │ │ Agent │
│ (Robotic) │ │ (Vision) │ │ (Mixed) │ │ (Fleet) │
└──────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │ │
└────────────────┼───────────────┼──────────────┘
│ │
┌───────────────────────▼───────────────▼─────────────────────┐
│ Physical World & Sensor Network │
│ (Returned products, components, bins, transport lines) │
└─────────────────────────────────────────────────────────────┘
Implementation Detail 1: The Orchestrator Decision Transformer
The Orchestrator is the brain. It operates on a temporal graph representation of the supply chain. My experimentation showed that a Graph Neural Network (GNN) encoder before the transformer was essential to capture relational data (e.g., which refurbishment center is near which recycling hub).
Here's a simplified PyTorch implementation of the core DT logic:
import torch
import torch.nn as nn
import torch.nn.functional as F
class CircularOrchestratorDT(nn.Module):
def __init__(self, state_dim, act_dim, hidden_dim, max_len, num_layers):
super().__init__()
self.state_dim = state_dim
self.act_dim = act_dim
self.max_len = max_len
self.return_conditioning = True
# Embeddings for the sequence tokens
self.state_embed = nn.Linear(state_dim, hidden_dim)
self.act_embed = nn.Linear(act_dim, hidden_dim)
self.return_embed = nn.Linear(1, hidden_dim) # For target circularity return
self.time_embed = nn.Embedding(max_len, hidden_dim)
# The core Transformer
encoder_layer = nn.TransformerEncoderLayer(
d_model=hidden_dim, nhead=8, dim_feedforward=4*hidden_dim,
dropout=0.1, activation='gelu', batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
# Prediction heads
self.act_head = nn.Linear(hidden_dim, act_dim)
self.state_head = nn.Linear(hidden_dim, state_dim)
def forward(self, states, actions, returns_to_go, timesteps):
# states: (batch, seq_len, state_dim)
# actions: (batch, seq_len, act_dim) or (batch, seq_len-1, act_dim)
# returns_to_go: (batch, seq_len, 1)
# timesteps: (batch, seq_len)
batch_size, seq_len = states.shape[0], states.shape[1]
# Create token embeddings
state_emb = self.state_embed(states) # (batch, seq_len, hidden)
act_emb = self.act_embed(actions) if actions is not None else torch.zeros_like(state_emb)
return_emb = self.return_embed(returns_to_go)
time_emb = self.time_embed(timesteps)
# Stack tokens in the order: [return, state, action] for each timestep
# This is a key pattern I discovered through trial and error for best conditioning.
tokens = torch.stack([return_emb, state_emb, act_emb], dim=1)
tokens = tokens.reshape(batch_size, 3*seq_len, -1)
tokens = tokens + time_emb.repeat_interleave(3, dim=1)
# Apply causal mask so each token only sees past tokens
causal_mask = nn.Transformer.generate_square_subsequent_mask(3*seq_len).to(tokens.device)
transformer_out = self.transformer(tokens, mask=causal_mask)
# Reshape back and predict next action (focus on the last action token)
transformer_out = transformer_out.reshape(batch_size, seq_len, 3, -1)
next_action_logits = self.act_head(transformer_out[:, -1, 2, :]) # Last timestep, action token
return next_action_logits
# Example of conditioning for a high circularity target
target_return_to_go = torch.tensor([[[0.95]]]) # Aim for 95% circularity score
current_state = torch.randn(1, 1, 128) # Graph state of supply chain
predicted_action = model(current_state, None, target_return_to_go, timesteps=torch.tensor([[0]]))
Implementation Detail 2: Human Preference Learning via Bayesian Reward Modeling
One of the most challenging parts of my experimentation was dealing with conflicting objectives. A business manager wants profit, an environmental officer wants low carbon, and a social auditor wants safe working conditions. The agent needs to infer a composite reward. I implemented a Bayesian linear model for reward inference, where human feedback (e.g., "prioritize this disassembly line") updates the belief over reward weights.
import numpy as np
from scipy import stats
class BayesianRewardInference:
def __init__(self, feature_dim):
# Prior: Assume a multivariate normal over reward weights w
# w ~ N(prior_mu, prior_sigma)
self.prior_mu = np.zeros(feature_dim)
self.prior_sigma = np.eye(feature_dim) * 2.0
self.posterior_mu = self.prior_mu.copy()
self.posterior_sigma = self.prior_sigma.copy()
self.feature_dim = feature_dim
def update_with_human_choice(self, chosen_trajectory_features, alternative_trajectory_features):
"""
Human chooses one trajectory over another.
Implements Bayesian update for a logistic likelihood model (Bradley-Terry).
"""
# Feature difference: phi(chosen) - phi(alternative)
delta_phi = chosen_trajectory_features - alternative_trajectory_features
# For simplicity, use a Laplace approximation to update posterior
# In my full experiments, I used a more sophisticated MCMC sampler for this.
# This approximates P(w | choice) ∝ logistic(w·δφ) * N(w | prior_mu, prior_sigma)
# We'll do one step of MAP estimation with gradient ascent
def log_posterior(w):
log_likelihood = -np.log1p(np.exp(-np.dot(w, delta_phi))) # Logistic
log_prior = stats.multivariate_normal.logpdf(w, self.prior_mu, self.prior_sigma)
return log_likelihood + log_prior
# Simple gradient ascent (in practice, use SciPy optimizers)
w_current = self.posterior_mu.copy()
learning_rate = 0.01
for _ in range(100):
# Gradient of logistic part
exp_term = np.exp(-np.dot(w_current, delta_phi))
grad_logistic = delta_phi * exp_term / (1 + exp_term)
# Gradient of log prior
grad_prior = -np.linalg.solve(self.prior_sigma, w_current - self.prior_mu)
grad = grad_logistic + grad_prior
w_current += learning_rate * grad
self.posterior_mu = w_current
# Simplified posterior covariance update (hessian approximation)
# A more complete implementation would compute the Hessian of log_posterior
self.posterior_sigma = self.prior_sigma # Placeholder
def sample_reward_weights(self, n_samples=1):
"""Sample reward weights from the current posterior."""
return np.random.multivariate_normal(self.posterior_mu, self.posterior_sigma, size=n_samples)
# Example: Human chooses a trajectory with higher material purity over slightly higher profit
inferrer = BayesianRewardInference(feature_dim=3) # e.g., [profit, carbon, material_purity]
chosen_feats = np.array([0.8, 0.9, 0.95]) # High purity
alt_feats = np.array([0.85, 0.9, 0.70]) # Higher profit, lower purity
inferrer.update_with_human_choice(chosen_feats, alt_feats)
sampled_weights = inferrer.sample_reward_weights()
print("Sampled reward weights (profit, carbon, purity):", sampled_weights[0])
Implementation Detail 3: Embodied Worker Agent with Real-Time Feedback
The embodied agents are where the policy meets the physical world. I built a simulation environment using PyBullet to test a disassembly robot agent. The key learning was the need for a hybrid architecture: a fast, reactive low-level controller (e.g., impedance control for unscrewing) and a slower, deliberative high-level planner that adjusts the task sequence based on visual feedback.
# Simplified structure of an embodied worker agent
class DisassemblyEmbodiedAgent:
def __init__(self, policy_checkpoint_path):
self.high_level_policy = torch.load(policy_checkpoint_path) # A small DT
self.current_task = None
self.perception_buffer = []
self.feedback_history = []
def execute_work_order(self, work_order, sensor_stream):
"""
work_order: from Orchestrator, e.g., {'product_id': 'phone_x', 'goal': 'extract_battery'}
sensor_stream: generator yielding (camera_image, force_torque_readings)
"""
self.current_task = work_order
task_complete = False
step_sequence = []
while not task_complete:
# 1. Get current observation from sensors
image, ft_sensor = next(sensor_stream)
obs = self._process_observation(image, ft_sensor)
# 2. High-level decision: what sub-action to take next?
# Condition the DT on the remaining sub-task and recent feedback
state_tensor = self._format_state(obs, work_order['goal'], self.feedback_history[-5:])
return_condition = torch.tensor([[[1.0]]]) # Aim for successful completion
sub_action_logits = self.high_level_policy(state_tensor, None, return_condition, timesteps=0)
sub_action = torch.argmax(sub_action_logits, dim=-1).item() # e.g., 0=approach, 1=grasp, 2=unscrew, 3=extract
# 3. Execute via low-level controller (reactive, hard-coded or trained policy)
low_level_success, feedback = self._execute_low_level(sub_action, obs)
# 4. Log feedback for learning loop
self.feedback_history.append({
'sub_action': sub_action,
'success': low_level_success,
'raw_feedback': feedback,
'timestamp': time.time()
})
# 5. Check for task completion or failure
if self._check_goal_met(obs, work_order['goal']):
task_complete = True
self._send_completion_signal_to_orchestrator(work_order, success=True)
elif self._check_failure(obs, self.feedback_history):
task_complete = True
self._send_completion_signal_to_orchestrator(work_order, success=False, reason='blockage')
def _execute_low_level(self, sub_action_id, observation):
"""Reactive controller. In my experiments, this was often a PID or impedance controller."""
if sub_action_id == 2: # Unscrew
# Use force-torque feedback to maintain contact and rotate
desired_force = 5.0 # Newtons, to maintain contact
current_force = observation['ft_reading'][2]
force_error = desired_force - current_force
# Simple P-controller for force in Z, while rotating
corrective_z_velocity = 0.1 * force_error
screw_rotation_velocity = 0.5 # rad/s
# Send command to robot
command = {'type': 'velocity', 'z': corrective_z_velocity, 'rz': screw_rotation_velocity}
success, actual_feedback = self.robot_interface.execute(command)
return success, {'force_error': force_error, 'rotation_applied': screw_rotation_velocity}
# ... other sub-actions
Real-World Applications & Challenges
During my prototyping, I applied a scaled-down version of this architecture to a simulated laptop refurbishment pipeline. The challenges were manifold, and each led to important learning.
Challenge 1: Non-Stationarity of the Physical World. A disassembly line doesn't have fixed dynamics. A batch of phones might use more adhesive than the previous batch. My initial DT, trained on historical data, failed catastrophically. Solution: Implemented a contextual bandit meta-layer that quickly identifies the "context" (e.g., "high-adhesive variant") and selects a fine-tuned policy from a library.
Top comments (0)