DEV Community

Rikin Patel
Rikin Patel

Posted on

Human-Aligned Decision Transformers for circular manufacturing supply chains with embodied agent feedback loops

Human-Aligned Decision Transformers for Circular Manufacturing

Human-Aligned Decision Transformers for circular manufacturing supply chains with embodied agent feedback loops

Introduction: A Learning Journey from Linear to Circular Intelligence

My journey into this fascinating intersection of AI and sustainable manufacturing began not with a grand theory, but with a frustrating debugging session. I was working on a reinforcement learning agent designed to optimize a traditional linear supply chain—maximize throughput, minimize cost. The agent, a sophisticated PPO model, was performing brilliantly on the metrics we had defined. Yet, during a demo for a client in the electronics manufacturing sector, a simple question derailed everything: "But what happens to all this plastic housing when the product reaches end-of-life? Your model just ships it to landfill."

In that moment, I realized the profound limitation of our approach. We were optimizing a broken, linear system. The metrics were wrong. The objective was misaligned. This sparked a months-long research and experimentation phase where I dove into circular economy principles, studied the emerging field of offline reinforcement learning, and began prototyping agents that could reason about entire material lifecycles. Through studying papers on Decision Transformers and human-in-the-loop RL, I learned that the key wasn't just better optimization, but a fundamental re-alignment of the AI's objective function with multi-stakeholder, long-term human values. This article is a synthesis of that exploration—a technical blueprint for building AI systems that don't just make supply chains efficient, but make them circular, regenerative, and human-aligned.

Technical Background: The Pillars of Circular AI

Before diving into the implementation, it's crucial to understand the three converging domains that form the foundation of this approach.

1. Decision Transformers (DTs): Unlike traditional RL algorithms that learn a policy through trial-and-error reward maximization, Decision Transformers frame sequential decision-making as a conditional sequence modeling problem. As I was experimenting with offline RL datasets, I found that DTs, built on transformer architectures, excel at leveraging historical trajectory data. They take a desired return (reward-to-go), past states, and actions, and autoregressively predict future actions. This "return-conditioned" behavior is perfect for circular systems where we want to specify how circular we aim to be (e.g., "achieve 95% material recovery") and have the agent figure out the sequence of actions to get there.

2. Human-Alignment & Inverse Reward Design: A core insight from my research into AI safety is that we rarely can specify a perfect reward function. In circular manufacturing, the "reward" is a complex, often contradictory blend of economic cost, carbon footprint, social impact, and material purity. Human-alignment techniques, inspired by work like Cooperative Inverse Reinforcement Learning (CIRL), don't just learn to maximize a proxy reward. They learn a model of human preferences and uncertainties. During my investigation, I implemented a Bayesian approach where the agent maintains a posterior distribution over possible true reward functions based on sparse human feedback.

3. Embodied Agent Feedback Loops: This is where the digital meets the physical. A supply chain isn't just software—it involves robots, sorting arms, sensors, and logistics vehicles. An embodied agent is one that perceives and acts within a physical environment. The feedback loop is critical: actions (e.g., "disassemble product X") change the physical state, which is measured (e.g., "component Y is damaged"), and this new state informs the next decision. My exploration of robotic simulation platforms like NVIDIA Isaac Sim revealed the challenge of sim-to-real transfer for delicate disassembly tasks, which directly informed the architecture's design.

Architecture: The Core System Design

The proposed system is a multi-agent hierarchy where a central Orchestrator DT makes high-level strategic decisions (e.g., route this batch of returned smartphones to refurbishment vs. recycling), and multiple Embodied Worker Agents execute physical tasks, providing continuous feedback.

┌─────────────────────────────────────────────────────────────┐
│                 Human Preference Interface                   │
│  (Sparse feedback, constraint specification, value tuning)  │
└───────────────────────┬─────────────────────────────────────┘
                        │
┌───────────────────────▼─────────────────────────────────────┐
│            Orchestrator Decision Transformer                 │
│  - Models full supply chain graph state                      │
│  - Return-conditioned on circularity KPIs                    │
│  - Outputs high-level work orders & routes                   │
└───────────────────────┬─────────────────────────────────────┘
                        │
         ┌──────────────┼──────────────┬──────────────┐
         │              │              │              │
┌────────▼──┐    ┌─────▼─────┐  ┌─────▼─────┐  ┌─────▼─────┐
│ Disassembly │    │ Sorting   │  │ Refurbish │  │ Logistics │
│   Agent     │    │   Agent   │  │   Agent   │  │   Agent   │
│ (Robotic)   │    │ (Vision)  │  │ (Mixed)   │  │ (Fleet)   │
└──────┬─────┘    └─────┬─────┘  └─────┬─────┘  └─────┬─────┘
       │                │               │              │
       └────────────────┼───────────────┼──────────────┘
                        │               │
┌───────────────────────▼───────────────▼─────────────────────┐
│          Physical World & Sensor Network                     │
│  (Returned products, components, bins, transport lines)     │
└─────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Implementation Detail 1: The Orchestrator Decision Transformer

The Orchestrator is the brain. It operates on a temporal graph representation of the supply chain. My experimentation showed that a Graph Neural Network (GNN) encoder before the transformer was essential to capture relational data (e.g., which refurbishment center is near which recycling hub).

Here's a simplified PyTorch implementation of the core DT logic:

import torch
import torch.nn as nn
import torch.nn.functional as F

class CircularOrchestratorDT(nn.Module):
    def __init__(self, state_dim, act_dim, hidden_dim, max_len, num_layers):
        super().__init__()
        self.state_dim = state_dim
        self.act_dim = act_dim
        self.max_len = max_len
        self.return_conditioning = True

        # Embeddings for the sequence tokens
        self.state_embed = nn.Linear(state_dim, hidden_dim)
        self.act_embed = nn.Linear(act_dim, hidden_dim)
        self.return_embed = nn.Linear(1, hidden_dim) # For target circularity return
        self.time_embed = nn.Embedding(max_len, hidden_dim)

        # The core Transformer
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim, nhead=8, dim_feedforward=4*hidden_dim,
            dropout=0.1, activation='gelu', batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)

        # Prediction heads
        self.act_head = nn.Linear(hidden_dim, act_dim)
        self.state_head = nn.Linear(hidden_dim, state_dim)

    def forward(self, states, actions, returns_to_go, timesteps):
        # states: (batch, seq_len, state_dim)
        # actions: (batch, seq_len, act_dim) or (batch, seq_len-1, act_dim)
        # returns_to_go: (batch, seq_len, 1)
        # timesteps: (batch, seq_len)

        batch_size, seq_len = states.shape[0], states.shape[1]

        # Create token embeddings
        state_emb = self.state_embed(states) # (batch, seq_len, hidden)
        act_emb = self.act_embed(actions) if actions is not None else torch.zeros_like(state_emb)
        return_emb = self.return_embed(returns_to_go)
        time_emb = self.time_embed(timesteps)

        # Stack tokens in the order: [return, state, action] for each timestep
        # This is a key pattern I discovered through trial and error for best conditioning.
        tokens = torch.stack([return_emb, state_emb, act_emb], dim=1)
        tokens = tokens.reshape(batch_size, 3*seq_len, -1)
        tokens = tokens + time_emb.repeat_interleave(3, dim=1)

        # Apply causal mask so each token only sees past tokens
        causal_mask = nn.Transformer.generate_square_subsequent_mask(3*seq_len).to(tokens.device)
        transformer_out = self.transformer(tokens, mask=causal_mask)

        # Reshape back and predict next action (focus on the last action token)
        transformer_out = transformer_out.reshape(batch_size, seq_len, 3, -1)
        next_action_logits = self.act_head(transformer_out[:, -1, 2, :]) # Last timestep, action token

        return next_action_logits

# Example of conditioning for a high circularity target
target_return_to_go = torch.tensor([[[0.95]]]) # Aim for 95% circularity score
current_state = torch.randn(1, 1, 128) # Graph state of supply chain
predicted_action = model(current_state, None, target_return_to_go, timesteps=torch.tensor([[0]]))
Enter fullscreen mode Exit fullscreen mode

Implementation Detail 2: Human Preference Learning via Bayesian Reward Modeling

One of the most challenging parts of my experimentation was dealing with conflicting objectives. A business manager wants profit, an environmental officer wants low carbon, and a social auditor wants safe working conditions. The agent needs to infer a composite reward. I implemented a Bayesian linear model for reward inference, where human feedback (e.g., "prioritize this disassembly line") updates the belief over reward weights.

import numpy as np
from scipy import stats

class BayesianRewardInference:
    def __init__(self, feature_dim):
        # Prior: Assume a multivariate normal over reward weights w
        # w ~ N(prior_mu, prior_sigma)
        self.prior_mu = np.zeros(feature_dim)
        self.prior_sigma = np.eye(feature_dim) * 2.0
        self.posterior_mu = self.prior_mu.copy()
        self.posterior_sigma = self.prior_sigma.copy()
        self.feature_dim = feature_dim

    def update_with_human_choice(self, chosen_trajectory_features, alternative_trajectory_features):
        """
        Human chooses one trajectory over another.
        Implements Bayesian update for a logistic likelihood model (Bradley-Terry).
        """
        # Feature difference: phi(chosen) - phi(alternative)
        delta_phi = chosen_trajectory_features - alternative_trajectory_features

        # For simplicity, use a Laplace approximation to update posterior
        # In my full experiments, I used a more sophisticated MCMC sampler for this.
        # This approximates P(w | choice) ∝ logistic(w·δφ) * N(w | prior_mu, prior_sigma)
        # We'll do one step of MAP estimation with gradient ascent
        def log_posterior(w):
            log_likelihood = -np.log1p(np.exp(-np.dot(w, delta_phi)))  # Logistic
            log_prior = stats.multivariate_normal.logpdf(w, self.prior_mu, self.prior_sigma)
            return log_likelihood + log_prior

        # Simple gradient ascent (in practice, use SciPy optimizers)
        w_current = self.posterior_mu.copy()
        learning_rate = 0.01
        for _ in range(100):
            # Gradient of logistic part
            exp_term = np.exp(-np.dot(w_current, delta_phi))
            grad_logistic = delta_phi * exp_term / (1 + exp_term)
            # Gradient of log prior
            grad_prior = -np.linalg.solve(self.prior_sigma, w_current - self.prior_mu)
            grad = grad_logistic + grad_prior
            w_current += learning_rate * grad

        self.posterior_mu = w_current
        # Simplified posterior covariance update (hessian approximation)
        # A more complete implementation would compute the Hessian of log_posterior
        self.posterior_sigma = self.prior_sigma  # Placeholder

    def sample_reward_weights(self, n_samples=1):
        """Sample reward weights from the current posterior."""
        return np.random.multivariate_normal(self.posterior_mu, self.posterior_sigma, size=n_samples)

# Example: Human chooses a trajectory with higher material purity over slightly higher profit
inferrer = BayesianRewardInference(feature_dim=3) # e.g., [profit, carbon, material_purity]
chosen_feats = np.array([0.8, 0.9, 0.95])  # High purity
alt_feats = np.array([0.85, 0.9, 0.70])   # Higher profit, lower purity
inferrer.update_with_human_choice(chosen_feats, alt_feats)
sampled_weights = inferrer.sample_reward_weights()
print("Sampled reward weights (profit, carbon, purity):", sampled_weights[0])
Enter fullscreen mode Exit fullscreen mode

Implementation Detail 3: Embodied Worker Agent with Real-Time Feedback

The embodied agents are where the policy meets the physical world. I built a simulation environment using PyBullet to test a disassembly robot agent. The key learning was the need for a hybrid architecture: a fast, reactive low-level controller (e.g., impedance control for unscrewing) and a slower, deliberative high-level planner that adjusts the task sequence based on visual feedback.

# Simplified structure of an embodied worker agent
class DisassemblyEmbodiedAgent:
    def __init__(self, policy_checkpoint_path):
        self.high_level_policy = torch.load(policy_checkpoint_path)  # A small DT
        self.current_task = None
        self.perception_buffer = []
        self.feedback_history = []

    def execute_work_order(self, work_order, sensor_stream):
        """
        work_order: from Orchestrator, e.g., {'product_id': 'phone_x', 'goal': 'extract_battery'}
        sensor_stream: generator yielding (camera_image, force_torque_readings)
        """
        self.current_task = work_order
        task_complete = False
        step_sequence = []

        while not task_complete:
            # 1. Get current observation from sensors
            image, ft_sensor = next(sensor_stream)
            obs = self._process_observation(image, ft_sensor)

            # 2. High-level decision: what sub-action to take next?
            # Condition the DT on the remaining sub-task and recent feedback
            state_tensor = self._format_state(obs, work_order['goal'], self.feedback_history[-5:])
            return_condition = torch.tensor([[[1.0]]])  # Aim for successful completion
            sub_action_logits = self.high_level_policy(state_tensor, None, return_condition, timesteps=0)
            sub_action = torch.argmax(sub_action_logits, dim=-1).item()  # e.g., 0=approach, 1=grasp, 2=unscrew, 3=extract

            # 3. Execute via low-level controller (reactive, hard-coded or trained policy)
            low_level_success, feedback = self._execute_low_level(sub_action, obs)

            # 4. Log feedback for learning loop
            self.feedback_history.append({
                'sub_action': sub_action,
                'success': low_level_success,
                'raw_feedback': feedback,
                'timestamp': time.time()
            })

            # 5. Check for task completion or failure
            if self._check_goal_met(obs, work_order['goal']):
                task_complete = True
                self._send_completion_signal_to_orchestrator(work_order, success=True)
            elif self._check_failure(obs, self.feedback_history):
                task_complete = True
                self._send_completion_signal_to_orchestrator(work_order, success=False, reason='blockage')

    def _execute_low_level(self, sub_action_id, observation):
        """Reactive controller. In my experiments, this was often a PID or impedance controller."""
        if sub_action_id == 2:  # Unscrew
            # Use force-torque feedback to maintain contact and rotate
            desired_force = 5.0  # Newtons, to maintain contact
            current_force = observation['ft_reading'][2]
            force_error = desired_force - current_force

            # Simple P-controller for force in Z, while rotating
            corrective_z_velocity = 0.1 * force_error
            screw_rotation_velocity = 0.5  # rad/s

            # Send command to robot
            command = {'type': 'velocity', 'z': corrective_z_velocity, 'rz': screw_rotation_velocity}
            success, actual_feedback = self.robot_interface.execute(command)

            return success, {'force_error': force_error, 'rotation_applied': screw_rotation_velocity}
        # ... other sub-actions
Enter fullscreen mode Exit fullscreen mode

Real-World Applications & Challenges

During my prototyping, I applied a scaled-down version of this architecture to a simulated laptop refurbishment pipeline. The challenges were manifold, and each led to important learning.

Challenge 1: Non-Stationarity of the Physical World. A disassembly line doesn't have fixed dynamics. A batch of phones might use more adhesive than the previous batch. My initial DT, trained on historical data, failed catastrophically. Solution: Implemented a contextual bandit meta-layer that quickly identifies the "context" (e.g., "high-adhesive variant") and selects a fine-tuned policy from a library.

Top comments (0)