Source: MarkTechPost
In this tutorial, we explore Online Process Reward Learning (OPRL) and demonstrate how we can learn dense, step-level reward signals from trajectory preferences to solve sparse-reward reinforcement learning tasks. We walk through each component, from the maze environment and reward-model network to preference generation, training loops, and evaluation, while observing how the agent gradually improves its behaviour through online preference-driven shaping. By running this end-to-end implementation, we gain a practical understanding of how OPRL enables better credit assignment, faster learning, and more stable policy optimization in challenging environments where the agent would otherwise struggle to discover meaningful rewards. Check out the FULL CODE NOTEBOOK.
import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.optim import Adam import matplotlib.pyplot as plt from collections import deque import random torch.manual_seed(42) np.random.seed(42) random.seed(42) class MazeEnv: def __init__(self, size=8): self.size = size self.start = (0, 0) self.goal = (size-1, size-1) self.obstacles = set([(i, size//2) for i in range(1, size-2)]) self.reset() def reset(self): self.pos = self.start self.steps = 0 return self._get_state() def _get_state(self): state = np.zeros(self.size * self.size) state[self.pos[0] * self.size + self.pos[1]] = 1 return state def step(self, action): moves = [(-1,0), (0,1), (1,0), (0,-1)] new_pos = (self.pos[0] + moves[action][0], self.pos[1] + moves[action][1]) if (0 <= new_pos[0] < self.size and 0 <= new_pos[1] < self.size and new_pos not in self.obstacles): self.pos = new_pos self.steps += 1 done = self.pos == self.goal or self.steps >= 60 reward = 10.0 if self.pos == self.goal else 0.0 return self._get_state(), reward, done def render(self): grid = [['.' for _ in range(self.size)] for _ in range(self.size)] for obs in self.obstacles: grid[obs[0]][obs[1]] = '█' grid[self.goal[0]][self.goal[1]] = 'G' grid[self.pos[0]][self.pos[1]] = 'A' return 'n'.join([''.join(row) for row in grid]) class ProcessRewardModel(nn.Module): def __init__(self, state_dim, hidden=128): super().__init__() self.net = nn.Sequential( nn.Linear(state_dim, hidden), nn.LayerNorm(hidden), nn.ReLU(), nn.Linear(hidden, hidden), nn.LayerNorm(hidden), nn.ReLU(), nn.Linear(hidden, 1), nn.Tanh() ) def forward(self, states): return self.net(states) def trajectory_reward(self, states): return self.forward(states).sum() class PolicyNetwork(nn.Module): def __init__(self, state_dim, action_dim, hidden=128): super().__init__() self.backbone = nn.Sequential( nn.Linear(state_dim, hidden), nn.ReLU(), nn.Linear(hidden, hidden), nn.ReLU() ) self.actor = nn.Linear(hidden, action_dim) self.critic = nn.Linear(hidden, 1) def forward(self, state): features = self.backbone(state) return self.actor(features), self.critic(features)
We set up the entire foundation of our OPRL system by importing libraries, defining the maze environment, and building the reward and policy networks. We establish how states are represented, how obstacles block movement, and how the sparse reward structure works. We also design the core neural models that will later learn process rewards and drive the policy’s decisions. Check out the FULL CODE NOTEBOOK.
class OPRLAgent: def __init__(self, state_dim, action_dim, lr=3e-4): self.policy = PolicyNetwork(state_dim, action_dim) self.reward_model = ProcessRewardModel(state_dim) self.policy_opt = Adam(self.policy.parameters(), lr=lr) self.reward_opt = Adam(self.reward_model.parameters(), lr=lr) self.trajectories = deque(maxlen=200) self.preferences = deque(maxlen=500) self.action_dim = action_dim def select_action(self, state, epsilon=0.1): if random.random() < epsilon: return random.randint(0, self.action_dim - 1) state_t = torch.FloatTensor(state).unsqueeze(0) with torch.no_grad(): logits, _ = self.policy(state_t) probs = F.softmax(logits, dim=-1) return torch.multinomial(probs, 1).item() def collect_trajectory(self, env, epsilon=0.1): states, actions, rewards = [], [], [] state = env.reset() done = False while not done: action = self.select_action(state, epsilon) next_state, reward, done = env.step(action) states.append(state) actions.append(action) rewards.append(reward) state = next_state traj = { 'states': torch.FloatTensor(np.array(states)), 'actions': torch.LongTensor(actions), 'rewards': torch.FloatTensor(rewards), 'return': float(sum(rewards)) } self.trajectories.append(traj) return traj
We begin constructing the OPRL agent by implementing action selection and trajectory collection. We use an ε-greedy strategy to ensure exploration and gather sequences of states, actions, and returns. As we run the agent through the maze, we store entire trajectories that will later serve as preference data for shaping the reward model. Check out the FULL CODE NOTEBOOK.
def generate_preference(self): if len(self.trajectories) < 2: return t1, t2 = random.sample(list(self.trajectories), 2) label = 1.0 if t1['return'] > t2['return'] else 0.0 self.preferences.append({'t1': t1, 't2': t2, 'label': label}) def train_reward_model(self, n_updates=5): if len(self.preferences) < 32: return 0.0 total_loss = 0.0 for _ in range(n_updates): batch = random.sample(list(self.preferences), 32) loss = 0.0 for item in batch: r1 = self.reward_model.trajectory_reward(item['t1']['states']) r2 = self.reward_model.trajectory_reward(item['t2']['states']) logit = r1 - r2 pred_prob = torch.sigmoid(logit) label = item['label'] loss += -(label * torch.log(pred_prob + 1e-8) + (1-label) * torch.log(1 - pred_prob + 1e-8)) loss = loss / len(batch) self.reward_opt.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(self.reward_model.parameters(), 1.0) self.reward_opt.step() total_loss += loss.item() return total_loss / n_updates
We generate preference pairs from collected trajectories and train the process reward model using the Bradley–Terry formulation. We compare trajectory-level scores, compute probabilities, and update the reward model to reflect which behaviours appear better. This allows us to learn dense, differentiable, step-level rewards that guide the agent even when the environment itself is sparse. Check out the FULL CODE NOTEBOOK.
def train_policy(self, n_updates=3, gamma=0.98): if len(self.trajectories) < 5: return 0.0 total_loss = 0.0 for _ in range(n_updates): traj = random.choice(list(self.trajectories)) with torch.no_grad(): process_rewards = self.reward_model(traj['states']).squeeze() shaped_rewards = traj['rewards'] + 0.1 * process_rewards returns = [] G = 0 for r in reversed(shaped_rewards.tolist()): G = r + gamma * G returns.insert(0, G) returns = torch.FloatTensor(returns) returns = (returns - returns.mean()) / (returns.std() + 1e-8) logits, values = self.policy(traj['states']) log_probs = F.log_softmax(logits, dim=-1) action_log_probs = log_probs.gather(1, traj['actions'].unsqueeze(1)) advantages = returns - values.squeeze().detach() policy_loss = -(action_log_probs.squeeze() * advantages).mean() value_loss = F.mse_loss(values.squeeze(), returns) entropy = -(F.softmax(logits, dim=-1) * log_probs).sum(-1).mean() loss = policy_loss + 0.5 * value_loss - 0.01 * entropy self.policy_opt.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 1.0) self.policy_opt.step() total_loss += loss.item() return total_loss / n_updates def train_oprl(episodes=500, render_interval=100): env = MazeEnv(size=8) agent = OPRLAgent(state_dim=64, action_dim=4, lr=3e-4) returns, reward_losses, policy_losses = [], [], [] success_rate = [] for ep in range(episodes): epsilon = max(0.05, 0.5 - ep / 1000) traj = agent.collect_trajectory(env, epsilon) returns.append(traj['return']) if ep % 2 == 0 and ep > 10: agent.generate_preference() if ep > 20 and ep % 2 == 0: rew_loss = agent.train_reward_model(n_updates=3) reward_losses.append(rew_loss) if ep > 10: pol_loss = agent.train_policy(n_updates=2) policy_losses.append(pol_loss) success = 1 if traj['return'] > 5 else 0 success_rate.append(success) if ep % render_interval == 0 and ep > 0: test_env = MazeEnv(size=8) agent.collect_trajectory(test_env, epsilon=0) print(test_env.render()) return returns, reward_losses, policy_losses, success_rate
We train the policy using shaped rewards produced by the learned process reward model. We compute returns, advantages, value estimates, and entropy bonuses, enabling the agent to improve its strategy over time. We then build a full training loop in which exploration decays, preferences accumulate, and both the reward model and the policy are updated continuously. Check out the FULL CODE NOTEBOOK.
print("Training OPRL Agent on Sparse Reward Maze...n") returns, rew_losses, pol_losses, success = train_oprl(episodes=500, render_interval=250) fig, axes = plt.subplots(2, 2, figsize=(14, 10)) axes[0,0].plot(returns, alpha=0.3) axes[0,0].plot(np.convolve(returns, np.ones(20)/20, mode='valid'), linewidth=2) axes[0,0].set_xlabel('Episode') axes[0,0].set_ylabel('Return') axes[0,0].set_title('Agent Performance') axes[0,0].grid(alpha=0.3) success_smooth = np.convolve(success, np.ones(20)/20, mode='valid') axes[0,1].plot(success_smooth, linewidth=2, color='green') axes[0,1].set_xlabel('Episode') axes[0,1].set_ylabel('Success Rate') axes[0,1].set_title('Goal Success Rate') axes[0,1].grid(alpha=0.3) axes[1,0].plot(rew_losses, linewidth=2, color='orange') axes[1,0].set_xlabel('Update Step') axes[1,0].set_ylabel('Loss') axes[1,0].set_title('Reward Model Loss') axes[1,0].grid(alpha=0.3) axes[1,1].plot(pol_losses, linewidth=2, color='red') axes[1,1].set_xlabel('Update Step') axes[1,1].set_ylabel('Loss') axes[1,1].set_title('Policy Loss') axes[1,1].grid(alpha=0.3) plt.tight_layout() plt.show() print("OPRL Training Complete!") print("Process rewards, preference learning, reward shaping, and online updates demonstrated.")
We visualize the learning dynamics by plotting returns, success rates, reward-model loss, and policy loss. We monitor how the agent’s performance evolves as OPRL shapes the reward landscape. By the end of the visualization, we clearly see the impact of process rewards on solving a challenging, sparse-reward maze.
In conclusion, we see how OPRL transforms sparse terminal outcomes into rich online feedback that continuously guides the agent’s behaviour. We watch the process reward model learn preferences, shape the return signal, and accelerate the policy’s ability to reach the goal. With larger mazes, varying shaping strengths, or even real human preference feedback, we appreciate how OPRL provides a flexible and powerful framework for credit assignment in complex decision-making tasks. We finish with a clear, hands-on understanding of how OPRL operates and how we can extend it to more advanced agentic RL settings.
Check out the FULL CODE NOTEBOOK and Paper. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Asif Razzaq
Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. His most recent endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth coverage of machine learning and deep learning news that is both technically sound and easily understandable by a wide audience. The platform boasts of over 2 million monthly views, illustrating its popularity among audiences.
