import os import os.path as osp import random from collections import deque from time import time, sleep import numpy as np import tensorflow as tf from keras import backend as K from parallel_trpo.train import train_parallel_trpo from pposgd_mpi.run_mujoco import train_pposgd_mpi from rl_teacher.comparison_collectors import SyntheticComparisonCollector, HumanComparisonCollector from rl_teacher.envs import get_timesteps_per_episode from rl_teacher.envs import make_with_torque_removed from rl_teacher.label_schedules import LabelAnnealer, ConstantLabelSchedule from rl_teacher.nn import FullyConnectedMLP from rl_teacher.segment_sampling import sample_segment_from_path from rl_teacher.segment_sampling import segments_from_rand_rollout from rl_teacher.summaries import AgentLogger, make_summary_writer from rl_teacher.utils import slugify, corrcoef from rl_teacher.video import SegmentVideoRecorder CLIP_LENGTH = 1.5 class TraditionalRLRewardPredictor(object): """Predictor that always returns the true reward provided by the environment.""" def __init__(self, summary_writer): self.agent_logger = AgentLogger(summary_writer) def predict_reward(self, path): self.agent_logger.log_episode(path) # <-- This may cause problems in future versions of Teacher. return path["original_rewards"] def path_callback(self, path): pass class ComparisonRewardPredictor(): """Predictor that trains a model to predict how much reward is contained in a trajectory segment""" def __init__(self, env, summary_writer, comparison_collector, agent_logger, label_schedule): self.summary_writer = summary_writer self.agent_logger = agent_logger self.comparison_collector = comparison_collector self.label_schedule = label_schedule # Set up some bookkeeping self.recent_segments = deque(maxlen=200) # Keep a queue of recently seen segments to pull new comparisons from self._frames_per_segment = CLIP_LENGTH * env.fps self._steps_since_last_training = 0 self._n_timesteps_per_predictor_training = 1e2 # How often should we train our predictor? self._elapsed_predictor_training_iters = 0 # Build and initialize our predictor model config = tf.ConfigProto( device_count={'GPU': 0} ) self.sess = tf.InteractiveSession(config=config) self.obs_shape = env.observation_space.shape self.discrete_action_space = not hasattr(env.action_space, "shape") self.act_shape = (env.action_space.n,) if self.discrete_action_space else env.action_space.shape self.graph = self._build_model() self.sess.run(tf.global_variables_initializer()) def _predict_rewards(self, obs_segments, act_segments, network): """ :param obs_segments: tensor with shape = (batch_size, segment_length) + obs_shape :param act_segments: tensor with shape = (batch_size, segment_length) + act_shape :param network: neural net with .run() that maps obs and act tensors into a (scalar) value tensor :return: tensor with shape = (batch_size, segment_length) """ batchsize = tf.shape(obs_segments)[0] segment_length = tf.shape(obs_segments)[1] # Temporarily chop up segments into individual observations and actions obs = tf.reshape(obs_segments, (-1,) + self.obs_shape) acts = tf.reshape(act_segments, (-1,) + self.act_shape) # Run them through our neural network rewards = network.run(obs, acts) # Group the rewards back into their segments return tf.reshape(rewards, (batchsize, segment_length)) def _build_model(self): """ Our model takes in path segments with states and actions, and generates Q values. These Q values serve as predictions of the true reward. We can compare two segments and sum the Q values to get a prediction of a label of which segment is better. We then learn the weights for our model by comparing these labels with an authority (either a human or synthetic labeler). """ # Set up observation placeholders self.segment_obs_placeholder = tf.placeholder( dtype=tf.float32, shape=(None, None) + self.obs_shape, name="obs_placeholder") self.segment_alt_obs_placeholder = tf.placeholder( dtype=tf.float32, shape=(None, None) + self.obs_shape, name="alt_obs_placeholder") self.segment_act_placeholder = tf.placeholder( dtype=tf.float32, shape=(None, None) + self.act_shape, name="act_placeholder") self.segment_alt_act_placeholder = tf.placeholder( dtype=tf.float32, shape=(None, None) + self.act_shape, name="alt_act_placeholder") # A vanilla multi-layer perceptron maps a (state, action) pair to a reward (Q-value) mlp = FullyConnectedMLP(self.obs_shape, self.act_shape) self.q_value = self._predict_rewards(self.segment_obs_placeholder, self.segment_act_placeholder, mlp) alt_q_value = self._predict_rewards(self.segment_alt_obs_placeholder, self.segment_alt_act_placeholder, mlp) # We use trajectory segments rather than individual (state, action) pairs because # video clips of segments are easier for humans to evaluate segment_reward_pred_left = tf.reduce_sum(self.q_value, axis=1) segment_reward_pred_right = tf.reduce_sum(alt_q_value, axis=1) reward_logits = tf.stack([segment_reward_pred_left, segment_reward_pred_right], axis=1) # (batch_size, 2) self.labels = tf.placeholder(dtype=tf.int32, shape=(None,), name="comparison_labels") # delta = 1e-5 # clipped_comparison_labels = tf.clip_by_value(self.comparison_labels, delta, 1.0-delta) data_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=reward_logits, labels=self.labels) self.loss_op = tf.reduce_mean(data_loss) global_step = tf.Variable(0, name='global_step', trainable=False) self.train_op = tf.train.AdamOptimizer().minimize(self.loss_op, global_step=global_step) return tf.get_default_graph() def predict_reward(self, path): """Predict the reward for each step in a given path""" with self.graph.as_default(): q_value = self.sess.run(self.q_value, feed_dict={ self.segment_obs_placeholder: np.asarray([path["obs"]]), self.segment_act_placeholder: np.asarray([path["actions"]]), K.learning_phase(): False }) return q_value[0] def path_callback(self, path): path_length = len(path["obs"]) self._steps_since_last_training += path_length self.agent_logger.log_episode(path) # We may be in a new part of the environment, so we take new segments to build comparisons from segment = sample_segment_from_path(path, int(self._frames_per_segment)) if segment: self.recent_segments.append(segment) # If we need more comparisons, then we build them from our recent segments if len(self.comparison_collector) < int(self.label_schedule.n_desired_labels): self.comparison_collector.add_segment_pair( random.choice(self.recent_segments), random.choice(self.recent_segments)) # Train our predictor every X steps if self._steps_since_last_training >= int(self._n_timesteps_per_predictor_training): self.train_predictor() self._steps_since_last_training -= self._steps_since_last_training def train_predictor(self): self.comparison_collector.label_unlabeled_comparisons() minibatch_size = min(64, len(self.comparison_collector.labeled_decisive_comparisons)) labeled_comparisons = random.sample(self.comparison_collector.labeled_decisive_comparisons, minibatch_size) left_obs = np.asarray([comp['left']['obs'] for comp in labeled_comparisons]) left_acts = np.asarray([comp['left']['actions'] for comp in labeled_comparisons]) right_obs = np.asarray([comp['right']['obs'] for comp in labeled_comparisons]) right_acts = np.asarray([comp['right']['actions'] for comp in labeled_comparisons]) labels = np.asarray([comp['label'] for comp in labeled_comparisons]) with self.graph.as_default(): _, loss = self.sess.run([self.train_op, self.loss_op], feed_dict={ self.segment_obs_placeholder: left_obs, self.segment_act_placeholder: left_acts, self.segment_alt_obs_placeholder: right_obs, self.segment_alt_act_placeholder: right_acts, self.labels: labels, K.learning_phase(): True }) self._elapsed_predictor_training_iters += 1 self._write_training_summaries(loss) def _write_training_summaries(self, loss): self.agent_logger.log_simple("predictor/loss", loss) # Calculate correlation between true and predicted reward by running validation on recent episodes recent_paths = self.agent_logger.get_recent_paths_with_padding() if len(recent_paths) > 1 and self.agent_logger.summary_step % 10 == 0: # Run validation every 10 iters validation_obs = np.asarray([path["obs"] for path in recent_paths]) validation_acts = np.asarray([path["actions"] for path in recent_paths]) q_value = self.sess.run(self.q_value, feed_dict={ self.segment_obs_placeholder: validation_obs, self.segment_act_placeholder: validation_acts, K.learning_phase(): False }) ep_reward_pred = np.sum(q_value, axis=1) reward_true = np.asarray([path['original_rewards'] for path in recent_paths]) ep_reward_true = np.sum(reward_true, axis=1) self.agent_logger.log_simple("predictor/correlations", corrcoef(ep_reward_true, ep_reward_pred)) self.agent_logger.log_simple("predictor/num_training_iters", self._elapsed_predictor_training_iters) self.agent_logger.log_simple("labels/desired_labels", self.label_schedule.n_desired_labels) self.agent_logger.log_simple("labels/total_comparisons", len(self.comparison_collector)) self.agent_logger.log_simple( "labels/labeled_comparisons", len(self.comparison_collector.labeled_decisive_comparisons)) def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument('-e', '--env_id', required=True) parser.add_argument('-p', '--predictor', required=True) parser.add_argument('-n', '--name', required=True) parser.add_argument('-s', '--seed', default=1, type=int) parser.add_argument('-w', '--workers', default=4, type=int) parser.add_argument('-l', '--n_labels', default=None, type=int) parser.add_argument('-L', '--pretrain_labels', default=None, type=int) parser.add_argument('-t', '--num_timesteps', default=5e6, type=int) parser.add_argument('-a', '--agent', default="parallel_trpo", type=str) parser.add_argument('-i', '--pretrain_iters', default=10000, type=int) parser.add_argument('-V', '--no_videos', action="store_true") args = parser.parse_args() print("Setting things up...") env_id = args.env_id run_name = "%s/%s-%s" % (env_id, args.name, int(time())) summary_writer = make_summary_writer(run_name) env = make_with_torque_removed(env_id) num_timesteps = int(args.num_timesteps) experiment_name = slugify(args.name) if args.predictor == "rl": predictor = TraditionalRLRewardPredictor(summary_writer) else: agent_logger = AgentLogger(summary_writer) pretrain_labels = args.pretrain_labels if args.pretrain_labels else args.n_labels // 4 if args.n_labels: label_schedule = LabelAnnealer( agent_logger, final_timesteps=num_timesteps, final_labels=args.n_labels, pretrain_labels=pretrain_labels) else: print("No label limit given. We will request one label every few seconds.") label_schedule = ConstantLabelSchedule(pretrain_labels=pretrain_labels) if args.predictor == "synth": comparison_collector = SyntheticComparisonCollector() elif args.predictor == "human": bucket = os.environ.get('RL_TEACHER_GCS_BUCKET') assert bucket and bucket.startswith("gs://"), "env variable RL_TEACHER_GCS_BUCKET must start with gs://" comparison_collector = HumanComparisonCollector(env_id, experiment_name=experiment_name) else: raise ValueError("Bad value for --predictor: %s" % args.predictor) predictor = ComparisonRewardPredictor( env, summary_writer, comparison_collector=comparison_collector, agent_logger=agent_logger, label_schedule=label_schedule, ) print("Starting random rollouts to generate pretraining segments. No learning will take place...") pretrain_segments = segments_from_rand_rollout( env_id, make_with_torque_removed, n_desired_segments=pretrain_labels * 2, clip_length_in_seconds=CLIP_LENGTH, workers=args.workers) for i in range(pretrain_labels): # Turn our random segments into comparisons comparison_collector.add_segment_pair(pretrain_segments[i], pretrain_segments[i + pretrain_labels]) # Sleep until the human has labeled most of the pretraining comparisons while len(comparison_collector.labeled_comparisons) < int(pretrain_labels * 0.75): comparison_collector.label_unlabeled_comparisons() if args.predictor == "synth": print("%s synthetic labels generated... " % (len(comparison_collector.labeled_comparisons))) elif args.predictor == "human": print("%s/%s comparisons labeled. Please add labels w/ the human-feedback-api. Sleeping... " % ( len(comparison_collector.labeled_comparisons), pretrain_labels)) sleep(5) # Start the actual training for i in range(args.pretrain_iters): predictor.train_predictor() # Train on pretraining labels if i % 100 == 0: print("%s/%s predictor pretraining iters... " % (i, args.pretrain_iters)) # Wrap the predictor to capture videos every so often: if not args.no_videos: predictor = SegmentVideoRecorder(predictor, env, save_dir=osp.join('/tmp/rl_teacher_vids', run_name)) # We use a vanilla agent from openai/baselines that contains a single change that blinds it to the true reward # The single changed section is in `rl_teacher/agent/trpo/core.py` print("Starting joint training of predictor and agent") if args.agent == "parallel_trpo": train_parallel_trpo( env_id=env_id, make_env=make_with_torque_removed, predictor=predictor, summary_writer=summary_writer, workers=args.workers, runtime=(num_timesteps / 1000), max_timesteps_per_episode=get_timesteps_per_episode(env), timesteps_per_batch=8000, max_kl=0.001, seed=args.seed, ) elif args.agent == "pposgd_mpi": def make_env(): return make_with_torque_removed(env_id) train_pposgd_mpi(make_env, num_timesteps=num_timesteps, seed=args.seed, predictor=predictor) else: raise ValueError("%s is not a valid choice for args.agent" % args.agent) if __name__ == '__main__': main()