# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # CosineAnnealing lr scheduler adopted from Nemo # https://github.com/NVIDIA/NeMo/blob/main/nemo/core/optim/lr_scheduler.py#L403 # Used for benchmarking with Nemo import math import warnings from torch.optim.lr_scheduler import _LRScheduler class WarmupAnnealHoldPolicy(_LRScheduler): """Adds warmup kwargs and warmup logic to lr policy. All arguments should be passed as kwargs for clarity, Args: warmup_steps: Number of training steps in warmup stage warmup_ratio: Ratio of warmup steps to total steps max_steps: Total number of steps while training or `None` for infinite training min_lr: Minimum lr to hold the learning rate after decay at. constant_steps: Number of steps to keep lr constant at. constant_ratio: Ratio of steps to keep lr constant. """ def __init__( self, optimizer, *, warmup_steps=None, warmup_ratio=None, constant_steps=None, constant_ratio=None, max_steps=None, min_lr=0.0, last_epoch=-1, ): assert not ( warmup_steps is not None and warmup_ratio is not None ), "Either use particular number of step or ratio" assert not ( constant_steps is not None and constant_ratio is not None ), "Either use constant_steps or constant_ratio" assert warmup_ratio is None or max_steps is not None, "If there is a ratio, there should be a total steps" # It is necessary to assign all attributes *before* __init__, # as class is wrapped by an inner class. self.max_steps = max_steps if warmup_steps is not None: self.warmup_steps = warmup_steps elif warmup_ratio is not None: self.warmup_steps = int(warmup_ratio * max_steps) else: self.warmup_steps = 0 if constant_steps is not None: self.constant_steps = constant_steps elif constant_ratio is not None: self.constant_steps = int(constant_ratio * max_steps) else: self.constant_steps = 0 self.decay_steps = max_steps - (self.constant_steps + self.warmup_steps) self.min_lr = min_lr super().__init__(optimizer, last_epoch) def get_lr(self): if not self._get_lr_called_within_step: warnings.warn( "To get the last learning rate computed by the scheduler, please use `get_last_lr()`.", UserWarning ) step = self.last_epoch # Warmup steps if self.warmup_steps > 0 and step <= self.warmup_steps: return self._get_warmup_lr(step) # Constant steps after warmup and decay if self.constant_steps > 0 and (self.warmup_steps + self.decay_steps) < step <= self.max_steps: return self._get_constant_lr(step) # Min lr after max steps of updates if step > self.max_steps: return [self.min_lr for _ in self.base_lrs] return self._get_lr(step) def _get_warmup_lr(self, step): lr_val = (step + 1) / (self.warmup_steps + 1) return [initial_lr * lr_val for initial_lr in self.base_lrs] def _get_constant_lr(self, step): return [self.min_lr for _ in self.base_lrs] def _get_lr(self, step): """Simple const lr policy""" return self.base_lrs def _cosine_annealing(initial_lr, step, max_steps, min_lr): mult = 0.5 * (1 + math.cos(math.pi * step / max_steps)) out_lr = (initial_lr - min_lr) * mult + min_lr return out_lr def _linear_warmup_with_cosine_annealing(max_lr, warmup_steps, step, decay_steps, min_lr): assert max_lr > min_lr # Use linear warmup for the initial part. if warmup_steps > 0 and step <= warmup_steps: return max_lr * float(step) / float(warmup_steps) # For any steps larger than `decay_steps`, use `min_lr`. if step > warmup_steps + decay_steps: return min_lr # If we are done with the warmup period, use the decay style. num_steps_ = step - warmup_steps decay_steps_ = decay_steps decay_ratio = float(num_steps_) / float(decay_steps_) assert decay_ratio >= 0.0 assert decay_ratio <= 1.0 delta_lr = max_lr - min_lr coeff = 0.5 * (math.cos(math.pi * decay_ratio) + 1.0) return min_lr + coeff * delta_lr class CosineAnnealing(WarmupAnnealHoldPolicy): def __init__(self, optimizer, *, max_steps, min_lr=0, last_epoch=-1, **kwargs): super().__init__(optimizer=optimizer, max_steps=max_steps, last_epoch=last_epoch, min_lr=min_lr, **kwargs) def _get_lr(self, step): for initial_lr in self.base_lrs: if initial_lr < self.min_lr: raise ValueError( f"{self} received an initial learning rate that was lower than the minimum learning rate." ) if self.constant_steps is None or self.constant_steps == 0: new_lrs = [ _cosine_annealing( initial_lr=initial_lr, step=step - self.warmup_steps, max_steps=self.max_steps - self.warmup_steps, min_lr=self.min_lr, ) for initial_lr in self.base_lrs ] else: new_lrs = self._get_linear_warmup_with_cosine_annealing_lr(step) return new_lrs def _get_warmup_lr(self, step): if self.constant_steps is None or self.constant_steps == 0: return super()._get_warmup_lr(step) else: # Use linear warmup for the initial part. return self._get_linear_warmup_with_cosine_annealing_lr(step) def _get_constant_lr(self, step): # Only called when `constant_steps` > 0. return self._get_linear_warmup_with_cosine_annealing_lr(step) def _get_linear_warmup_with_cosine_annealing_lr(self, step): # Cosine Schedule for Megatron LM, slightly different warmup schedule + constant LR at the end. new_lrs = [ _linear_warmup_with_cosine_annealing( max_lr=self.base_lrs[0], warmup_steps=self.warmup_steps, step=step, decay_steps=self.decay_steps, min_lr=self.min_lr, ) for _ in self.base_lrs ] return new_lrs